Skip to main content

tycho_collator/manager/
mod.rs

1use std::collections::{BTreeMap, VecDeque, hash_map};
2use std::sync::Arc;
3
4use ahash::HashMapExt;
5use anyhow::{Context, Result, anyhow, bail};
6use async_trait::async_trait;
7use futures_util::TryFutureExt;
8use parking_lot::{Mutex, RwLock};
9use tokio::sync::Notify;
10use tokio_util::sync::CancellationToken;
11use tycho_block_util::block::{TopBlocks, ValidatorSubsetInfo, calc_next_block_id_short};
12use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
13use tycho_block_util::state::ShardStateStuff;
14use tycho_core::global_config::MempoolGlobalConfig;
15use tycho_core::storage::{LoadStateHint, StateNotFound};
16use tycho_crypto::ed25519::KeyPair;
17use tycho_types::models::{
18    BlockId, BlockIdShort, CollationConfig, GlobalCapabilities, ProcessedUptoInfo, ShardIdent,
19    ValidatorDescription,
20};
21use tycho_util::futures::AwaitBlocking;
22use tycho_util::metrics::HistogramGuard;
23use tycho_util::{DashMapEntry, FastDashMap, FastHashMap, FastHashSet};
24use types::{
25    ActiveCollator, ActiveSync, BlockCacheEntry, BlockCacheEntryData, BlockCacheStoreResult,
26    CollatedBlockInfo, CollationState, CollationStatus, CollatorState, HandledBlockFromBcCtx,
27    ImportedAnchorEvent, McBlockSubgraph, NextCollationStep,
28};
29
30use self::blocks_cache::BlocksCache;
31use self::types::{BlockCacheKey, CandidateStatus, CollationSyncState, McBlockSubgraphExtract};
32use self::utils::find_us_in_collators_set;
33use crate::collator::{
34    CollationCancelReason, Collator, CollatorContext, CollatorEventListener, CollatorFactory,
35    ForceMasterCollation,
36};
37use crate::internal_queue::types::diff::{DiffZone, QueueDiffWithMessages};
38use crate::internal_queue::types::message::EnqueuedMessage;
39use crate::internal_queue::types::stats::DiffStatistics;
40use crate::mempool::{
41    MempoolAdapter, MempoolAdapterFactory, MempoolAnchor, MempoolAnchorId, MempoolEventListener,
42    StateUpdateContext,
43};
44use crate::queue_adapter::MessageQueueAdapter;
45use crate::state_node::{StateNodeAdapter, StateNodeAdapterFactory, StateNodeEventListener};
46use crate::types::processed_upto::{
47    BlockSeqno, ProcessedUptoInfoExtension, ProcessedUptoInfoStuff, find_min_processed_to_by_shards,
48};
49use crate::types::{
50    BlockCollationResult, BlockIdExt, CollationSessionId, CollationSessionInfo, CollatorConfig,
51    DebugIter, DisplayAsShortId, DisplayBlockIdsIntoIter, McData, ProcessedToByPartitions,
52    ShardDescriptionShort, ShardDescriptionShortExt, ShardHashesExt, TopBlockId, TopBlockIdUpdated,
53};
54use crate::utils::async_dispatcher::{AsyncDispatcher, STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE};
55use crate::utils::block::detect_top_processed_to_anchor;
56use crate::utils::shard::calc_split_merge_actions;
57use crate::utils::vset_cache::ValidatorSetCache;
58use crate::validator::{AddSession, ValidationStatus, Validator};
59use crate::{method_to_async_closure, tracing_targets};
60
61mod blocks_cache;
62mod types;
63mod utils;
64
65#[cfg(test)]
66#[path = "tests/manager_tests.rs"]
67pub(super) mod tests;
68
69const BLOCKS_FROM_BC_QUEUE_LIMIT: usize = 1000;
70
71pub struct RunningCollationManager<CF, V>
72where
73    CF: CollatorFactory,
74{
75    cancel_async_tasks: CancellationToken,
76    dispatcher: AsyncDispatcher<CollationManager<CF, V>>,
77    state_node_adapter: Arc<dyn StateNodeAdapter>,
78    mpool_adapter: Arc<dyn MempoolAdapter>,
79    mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
80}
81
82impl<CF: CollatorFactory, V> RunningCollationManager<CF, V> {
83    pub fn dispatcher(&self) -> &AsyncDispatcher<CollationManager<CF, V>> {
84        &self.dispatcher
85    }
86
87    pub fn state_node_adapter(&self) -> &Arc<dyn StateNodeAdapter> {
88        &self.state_node_adapter
89    }
90
91    pub fn mpool_adapter(&self) -> &Arc<dyn MempoolAdapter> {
92        &self.mpool_adapter
93    }
94
95    pub fn mq_adapter(&self) -> &Arc<dyn MessageQueueAdapter<EnqueuedMessage>> {
96        &self.mq_adapter
97    }
98}
99
100impl<CF: CollatorFactory, V> Drop for RunningCollationManager<CF, V> {
101    fn drop(&mut self) {
102        self.cancel_async_tasks.cancel();
103    }
104}
105
106pub struct CollationManager<CF, V>
107where
108    CF: CollatorFactory,
109{
110    keypair: Arc<KeyPair>,
111    config: Arc<CollatorConfig>,
112
113    dispatcher: Arc<AsyncDispatcher<Self>>,
114    state_node_adapter: Arc<dyn StateNodeAdapter>,
115    mpool_adapter: Arc<dyn MempoolAdapter>,
116    mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
117
118    collator_factory: CF,
119    validator: Arc<V>,
120
121    active_collation_sessions: RwLock<FastHashMap<ShardIdent, Arc<CollationSessionInfo>>>,
122    collation_sessions_to_finish: FastDashMap<CollationSessionId, Arc<CollationSessionInfo>>,
123    active_collators: FastDashMap<ShardIdent, ActiveCollator<Arc<CF::Collator>>>,
124    collators_to_stop: FastDashMap<CollationSessionId, ActiveCollator<Arc<CF::Collator>>>,
125
126    blocks_cache: BlocksCache,
127
128    /// Queue of received blocks from bc
129    blocks_from_bc_queue_sender: tokio::sync::mpsc::Sender<HandledBlockFromBcCtx>,
130
131    /// Used to sync tasks that may cause `sync_to_applied_mc_block`
132    ready_to_sync: Arc<Notify>,
133
134    /// block id of last processed master state (after collation or on sync)
135    last_processed_mc_block_id: Arc<Mutex<Option<BlockId>>>,
136
137    /// state to sync collation between master and shard chains
138    collation_sync_state: Arc<Mutex<CollationSyncState>>,
139
140    /// Cache for validator sets from config
141    validator_set_cache: ValidatorSetCache,
142
143    /// Mempool config override for a new genesis
144    mempool_config_override: Option<MempoolGlobalConfig>,
145
146    /// `McData` which processing was delayed until block is validated.
147    delayed_mc_state_update: Arc<Mutex<Option<Arc<McData>>>>,
148}
149
150#[async_trait]
151impl<CF, V> MempoolEventListener for AsyncDispatcher<CollationManager<CF, V>>
152where
153    CF: CollatorFactory,
154    V: Validator,
155{
156    async fn on_new_anchor(&self, anchor: Arc<MempoolAnchor>) -> Result<()> {
157        self.spawn_task(method_to_async_closure!(
158            process_new_anchor_from_mempool,
159            anchor
160        ))
161        .await
162    }
163}
164
165#[async_trait]
166impl<CF, V> StateNodeEventListener for AsyncDispatcher<CollationManager<CF, V>>
167where
168    CF: CollatorFactory,
169    V: Validator,
170{
171    async fn on_block_accepted(&self, state: &ShardStateStuff) -> Result<()> {
172        let processed_upto = state.state().processed_upto.load()?;
173
174        metrics_report_last_applied_block_and_anchor(state, &processed_upto)?;
175
176        let state_cloned = state.clone();
177
178        self.spawn_task(method_to_async_closure!(
179            detect_top_processed_to_anchor_and_notify_mempool,
180            state_cloned,
181            processed_upto.get_min_externals_processed_to()?.0
182        ))
183        .await?;
184
185        let state_cloned = state.clone();
186        self.spawn_task(move |worker| {
187            Box::pin(async move { worker.cancel_validation_sessions_until_block(state_cloned) })
188        })
189        .await?;
190
191        Ok(())
192    }
193
194    async fn on_block_accepted_external(
195        &self,
196        mc_block_id: &BlockId,
197        state: &ShardStateStuff,
198    ) -> Result<()> {
199        let processed_upto = state.state().processed_upto.load()?;
200
201        metrics_report_last_applied_block_and_anchor(state, &processed_upto)?;
202
203        let state = state.clone();
204        let ctx = HandledBlockFromBcCtx {
205            mc_block_id: *mc_block_id,
206            state,
207            processed_upto,
208        };
209
210        self.enqueue_task(method_to_async_closure!(enqueue_handle_block_from_bc, ctx))
211            .await?;
212
213        Ok(())
214    }
215}
216
217fn metrics_report_last_applied_block_and_anchor(
218    state: &ShardStateStuff,
219    processed_upto: &ProcessedUptoInfo,
220) -> Result<()> {
221    let block_id = state.block_id();
222    let labels = [("workchain", block_id.shard.workchain().to_string())];
223
224    let block_ct = state.get_gen_chain_time();
225    let processed_to_anchor_id = processed_upto.get_min_externals_processed_to()?.0;
226
227    metrics::gauge!("tycho_last_applied_block_seqno", &labels).set(block_id.seqno);
228    metrics::gauge!("tycho_last_processed_to_anchor_id", &labels).set(processed_to_anchor_id);
229
230    tracing::info!(target: tracing_targets::COLLATION_MANAGER,
231        block_id = %DisplayAsShortId(block_id),
232        block_ct,
233        processed_to_anchor_id = processed_to_anchor_id,
234        "last applied block",
235    );
236
237    Ok(())
238}
239
240#[async_trait]
241impl<CF, V> CollatorEventListener for AsyncDispatcher<CollationManager<CF, V>>
242where
243    CF: CollatorFactory,
244    V: Validator,
245{
246    async fn on_skipped(
247        &self,
248        prev_mc_block_id: BlockId,
249        next_block_id_short: BlockIdShort,
250        anchor_chain_time: u64,
251        force_mc_block: ForceMasterCollation,
252        collation_config: Arc<CollationConfig>,
253    ) -> Result<()> {
254        self.spawn_task(method_to_async_closure!(
255            handle_collation_skipped,
256            prev_mc_block_id,
257            next_block_id_short,
258            anchor_chain_time,
259            force_mc_block,
260            collation_config
261        ))
262        .await
263    }
264
265    async fn on_cancelled(
266        &self,
267        prev_mc_block_id: BlockId,
268        next_block_id_short: BlockIdShort,
269        cancel_reason: CollationCancelReason,
270    ) -> Result<()> {
271        self.spawn_task(method_to_async_closure!(
272            handle_collation_cancelled,
273            prev_mc_block_id,
274            next_block_id_short,
275            cancel_reason
276        ))
277        .await
278    }
279
280    async fn on_block_candidate(&self, collation_result: BlockCollationResult) -> Result<()> {
281        self.spawn_task(method_to_async_closure!(
282            handle_collated_block_candidate,
283            collation_result
284        ))
285        .await
286    }
287
288    async fn on_collator_stopped(&self, stop_key: CollationSessionId) -> Result<()> {
289        self.spawn_task(move |worker| {
290            Box::pin(async move { worker.handle_collator_stopped(stop_key) })
291        })
292        .await
293    }
294}
295
296impl<CF, V> CollationManager<CF, V>
297where
298    CF: CollatorFactory,
299    V: Validator,
300{
301    #[allow(clippy::too_many_arguments)]
302    pub fn start<STF, MPF>(
303        keypair: Arc<KeyPair>,
304        config: CollatorConfig,
305        mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
306        state_node_adapter_factory: STF,
307        mpool_adapter_factory: MPF,
308        validator: V,
309        collator_factory: CF,
310        mempool_config_override: Option<MempoolGlobalConfig>,
311    ) -> RunningCollationManager<CF, V>
312    where
313        STF: StateNodeAdapterFactory,
314        MPF: MempoolAdapterFactory,
315    {
316        tracing::info!(target: tracing_targets::COLLATION_MANAGER, "Creating collation manager...");
317
318        // create dispatcher for own tasks
319        let (dispatcher, dispatcher_ctx) = AsyncDispatcher::new(
320            "collation_manager_async_dispatcher",
321            STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE,
322        );
323        let arc_dispatcher = Arc::new(dispatcher.clone());
324
325        // create state node adapter
326        let state_node_adapter =
327            Arc::new(state_node_adapter_factory.create(arc_dispatcher.clone()));
328
329        // create mempool adapter
330        let mpool_adapter = mpool_adapter_factory.create(arc_dispatcher.clone());
331
332        let validator = Arc::new(validator);
333
334        let (blocks_from_bc_queue_sender, blocks_from_bc_queue_receiver) =
335            tokio::sync::mpsc::channel::<HandledBlockFromBcCtx>(BLOCKS_FROM_BC_QUEUE_LIMIT);
336
337        let ready_to_sync = Arc::new(Notify::new());
338        ready_to_sync.notify_one();
339
340        let blocks_cache = BlocksCache::new(state_node_adapter.zerostate_id());
341
342        let collation_manager = Self {
343            keypair,
344            config: Arc::new(config),
345            dispatcher: arc_dispatcher.clone(),
346            state_node_adapter: state_node_adapter.clone(),
347            mpool_adapter: mpool_adapter.clone(),
348            mq_adapter: mq_adapter.clone(),
349            collator_factory,
350            validator,
351
352            active_collation_sessions: Default::default(),
353            collation_sessions_to_finish: Default::default(),
354            active_collators: Default::default(),
355            collators_to_stop: Default::default(),
356
357            blocks_cache,
358
359            blocks_from_bc_queue_sender,
360
361            ready_to_sync,
362
363            last_processed_mc_block_id: Default::default(),
364
365            collation_sync_state: Default::default(),
366
367            validator_set_cache: Default::default(),
368
369            mempool_config_override,
370
371            delayed_mc_state_update: Arc::new(Mutex::new(None)),
372        };
373        let collation_manager = Arc::new(collation_manager);
374
375        let cancel_async_tasks = CancellationToken::new();
376
377        // start additional async tasks
378        tokio::spawn(
379            collation_manager
380                .clone()
381                .process_handle_block_from_bc_queue(
382                    blocks_from_bc_queue_receiver,
383                    cancel_async_tasks.clone(),
384                )
385                .map_err(|e| {
386                    tracing::error!("initial process_handle_block_from_bc_queue failed: {e:?}");
387                }),
388        );
389
390        // start tasks dispatcher
391        arc_dispatcher.run(collation_manager, dispatcher_ctx);
392        tracing::trace!(target: tracing_targets::COLLATION_MANAGER, "Tasks dispatchers started");
393
394        tracing::info!(target: tracing_targets::COLLATION_MANAGER, "Collation manager created");
395
396        RunningCollationManager {
397            cancel_async_tasks,
398            dispatcher,
399            state_node_adapter,
400            mpool_adapter,
401            mq_adapter,
402        }
403    }
404
405    /// (TODO) Check sync status between mempool and blockchain state
406    /// and pause collation when we are far behind other nodesб
407    /// jusct sync blcoks from blockchain
408    pub async fn process_new_anchor_from_mempool(&self, _anchor: Arc<MempoolAnchor>) -> Result<()> {
409        // TODO: make real implementation, currently does nothing
410        Ok(())
411    }
412
413    /// Tries to determine top anchor that was processed to
414    /// by info from received state and notify mempool
415    #[tracing::instrument(skip_all, fields(block_id = %state.block_id().as_short_id()))]
416    async fn detect_top_processed_to_anchor_and_notify_mempool(
417        &self,
418        state: ShardStateStuff,
419        mc_processed_to_anchor_id: MempoolAnchorId,
420    ) -> Result<()> {
421        // will make this only for master blocks
422        if !state.block_id().is_masterchain() {
423            return Ok(());
424        }
425
426        self.detect_top_processed_to_anchor_and_notify_mempool_impl(
427            state.block_id().seqno,
428            state
429                .shards()?
430                .as_vec()?
431                .into_iter()
432                .map(|(_, descr)| descr),
433            mc_processed_to_anchor_id,
434        )
435        .await
436    }
437
438    async fn detect_top_processed_to_anchor_and_notify_mempool_impl<I>(
439        &self,
440        mc_block_seqno: BlockSeqno,
441        mc_top_shards: I,
442        mc_processed_to_anchor_id: MempoolAnchorId,
443    ) -> Result<()>
444    where
445        I: Iterator<Item = ShardDescriptionShort>,
446    {
447        let top_processed_to_anchor =
448            detect_top_processed_to_anchor(mc_top_shards, mc_processed_to_anchor_id);
449
450        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
451            top_processed_to_anchor,
452            mc_processed_to_anchor_id,
453            "detected minimal top_processed_to_anchor, will notify mempool",
454
455        );
456
457        self.notify_top_processed_to_anchor_to_mempool(mc_block_seqno, top_processed_to_anchor)
458            .await?;
459
460        Ok(())
461    }
462
463    async fn notify_top_processed_to_anchor_to_mempool(
464        &self,
465        mc_block_seqno: BlockSeqno,
466        top_processed_to_anchor: MempoolAnchorId,
467    ) -> Result<()> {
468        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
469            top_processed_to_anchor,
470            "will notify top_processed_to_anchor to mempool",
471        );
472
473        self.mpool_adapter
474            .handle_signed_mc_block(mc_block_seqno)
475            .await?;
476
477        self.mpool_adapter
478            .clear_anchors_cache(top_processed_to_anchor)?;
479
480        Ok(())
481    }
482
483    #[tracing::instrument(skip_all, fields(block_id = %state.block_id().as_short_id()))]
484    fn cancel_validation_sessions_until_block(&self, state: ShardStateStuff) -> Result<()> {
485        let block_id = state.block_id().as_short_id();
486
487        let session_id = self
488            .active_collation_sessions
489            .read()
490            .get(&block_id.shard)
491            .map(|session_info| session_info.get_validation_session_id());
492
493        self.validator.cancel_validation(&block_id, session_id)?;
494        Ok(())
495    }
496
497    #[tracing::instrument(skip_all, fields(block_id = %block_id))]
498    fn commit_block_queue_diff(
499        mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
500        block_id: &BlockId,
501        top_shard_blocks_info: &[TopBlockIdUpdated],
502        partitions: &FastHashSet<QueuePartitionIdx>,
503    ) -> Result<()> {
504        if !block_id.is_masterchain() {
505            return Ok(());
506        }
507
508        let _histogram = HistogramGuard::begin("tycho_collator_commit_queue_diffs_time");
509
510        let mut top_blocks = top_shard_blocks_info.to_vec();
511        top_blocks.push(TopBlockIdUpdated {
512            block: TopBlockId {
513                ref_by_mc_seqno: block_id.seqno,
514                block_id: *block_id,
515            },
516            updated: true,
517        });
518
519        if let Err(err) = mq_adapter.commit_diff(top_blocks, partitions) {
520            bail!(
521                "Error committing message queue diff of block ({}): {:?}",
522                block_id,
523                err,
524            )
525        }
526
527        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
528            "message queue diff was committed",
529        );
530
531        Ok(())
532    }
533
534    /// Returns `BlockId` if diff was applied.
535    /// * `first_required_diffs` - contains ids of known first required diffs for queue for each shard
536    fn apply_block_queue_diff_from_entry_stuff(
537        state_node_adapter: &dyn StateNodeAdapter,
538        mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
539        block_entry: &BlockCacheEntry,
540        min_processed_to: Option<&QueueKey>,
541        first_required_diffs: &mut FastHashMap<ShardIdent, BlockId>,
542    ) -> Result<Option<BlockId>> {
543        let block_id = block_entry.block_id;
544
545        // TODO: error if <
546        if block_entry.ref_by_mc_seqno <= state_node_adapter.zerostate_id().seqno {
547            return Ok(None);
548        }
549
550        let queue_diff = match &block_entry.data {
551            BlockCacheEntryData::Collated {
552                candidate_stuff, ..
553            } => &candidate_stuff.candidate.queue_diff_aug.data,
554            BlockCacheEntryData::Received { queue_diff, .. } => queue_diff,
555        };
556
557        // skip diff below min processed to
558        if let Some(min_pt) = min_processed_to
559            && queue_diff.as_ref().max_message <= *min_pt
560        {
561            tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
562                "Skipping diff for block {}: max_message {} <= min_processed_to {}",
563                block_id.as_short_id(),
564                queue_diff.as_ref().max_message,
565                min_pt,
566            );
567            return Ok(None);
568        }
569
570        // skip already applied diff
571        if mq_adapter.is_diff_exists(&block_id.as_short_id())? {
572            tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
573                queue_diff_block_id = %block_id.as_short_id(),
574                "queue diff apply skipped because it is already applied",
575            );
576            // if diff for block from bc already applied
577            // then we should check sequense for each next diff
578            first_required_diffs.insert(block_id.shard, BlockId::default());
579            return Ok(None);
580        }
581
582        // load out_msg
583        let out_msgs = match &block_entry.data {
584            BlockCacheEntryData::Collated {
585                candidate_stuff, ..
586            } => &candidate_stuff
587                .candidate
588                .block
589                .data
590                .load_extra()?
591                .out_msg_description
592                .load()?,
593            BlockCacheEntryData::Received { out_msgs, .. } => &out_msgs.load()?,
594        };
595
596        let queue_diff_with_msgs = QueueDiffWithMessages::from_queue_diff(queue_diff, out_msgs)?;
597
598        let statistics = DiffStatistics::from_diff(
599            &queue_diff_with_msgs,
600            queue_diff.block_id().shard,
601            queue_diff.as_ref().min_message,
602            queue_diff.as_ref().max_message,
603        );
604
605        let check_sequence = match first_required_diffs.get(&block_id.shard).copied() {
606            None => {
607                // if first required diff was not detected before
608                // we consider that current is first
609                first_required_diffs.insert(block_id.shard, block_id);
610                None
611            }
612            Some(id) if id == block_id => None,
613            _ => Some(DiffZone::Both),
614        };
615
616        mq_adapter
617            .apply_diff(
618                queue_diff_with_msgs,
619                queue_diff.block_id().as_short_id(),
620                queue_diff.diff_hash(),
621                statistics,
622                check_sequence,
623            )
624            .context("apply_block_queue_diff_from_entry_stuff")?;
625
626        Ok(Some(block_id))
627    }
628
629    #[tracing::instrument(skip_all, fields(next_block_id = %next_block_id_short))]
630    async fn handle_collation_cancelled(
631        self: &Arc<Self>,
632        _prev_mc_block_id: BlockId,
633        next_block_id_short: BlockIdShort,
634        cancel_reason: CollationCancelReason,
635    ) -> Result<()> {
636        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
637            ?cancel_reason,
638            "start handle collation cancelled",
639        );
640
641        // NOTE: when collation cancelled from collator it guarantees
642        //      that uncommitted queue diff was not saved
643        match cancel_reason {
644            CollationCancelReason::AnchorNotFound(_)
645            | CollationCancelReason::NextAnchorNotFound(_)
646            | CollationCancelReason::ExternalCancel
647            | CollationCancelReason::DiffNotFoundInQueue(_) => {
648                // sync cache and collator state access
649                self.ready_to_sync.notified().await;
650                scopeguard::defer!(self.ready_to_sync.notify_one());
651
652                // mark collator as cancelled
653                self.set_collator_state(&next_block_id_short.shard, |ac| {
654                    ac.state = CollatorState::Cancelled;
655                });
656
657                // run sync if all collators cancelled or waiting
658                let has_active = self.active_collators.iter().any(|ac| {
659                    ac.state == CollatorState::Active || ac.state == CollatorState::CancelPending
660                });
661                if !has_active {
662                    tracing::info!(target: tracing_targets::COLLATION_MANAGER,
663                        "no active collators in shards and masterchain, \
664                        will run sync to last applied mc block",
665                    );
666
667                    // get info about applied mc blocks in cache
668                    let (last_collated_mc_block_id, applied_range) = self
669                        .blocks_cache
670                        .get_last_collated_block_and_applied_mc_queue_range();
671
672                    // run sync if have applied mc blocks
673                    self.sync_to_applied_mc_block_if_exist(
674                        last_collated_mc_block_id,
675                        applied_range,
676                    )
677                    .await?;
678                }
679            }
680        }
681        Ok(())
682    }
683
684    #[tracing::instrument(skip_all, fields(next_block_id = %next_block_id_short, ct = anchor_chain_time, ?force_mc_block))]
685    async fn handle_collation_skipped(
686        &self,
687        prev_mc_block_id: BlockId,
688        next_block_id_short: BlockIdShort,
689        anchor_chain_time: u64,
690        force_mc_block: ForceMasterCollation,
691        collation_config: Arc<CollationConfig>,
692    ) -> Result<()> {
693        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
694            "will run next collation step",
695        );
696
697        // sync cache and collator state access
698        self.ready_to_sync.notified().await;
699        scopeguard::defer!(self.ready_to_sync.notify_one());
700
701        // cancel collator if cancel was requested during active collation try
702        let updated_collator_state = self.set_collator_state(&next_block_id_short.shard, |ac| {
703            if ac.state == CollatorState::CancelPending {
704                ac.state = CollatorState::Cancelled;
705            }
706        });
707        if updated_collator_state == Some(CollatorState::Cancelled) {
708            tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
709                shard_id = %next_block_id_short.shard,
710                "collator was cancelled before",
711            );
712            return Ok(());
713        }
714
715        self.run_next_collation_step(
716            &prev_mc_block_id,
717            next_block_id_short.shard,
718            None,
719            DetectNextCollationStepContext::new(
720                anchor_chain_time,
721                force_mc_block,
722                collation_config.mc_block_min_interval_ms as _,
723                collation_config.mc_block_max_interval_ms as _,
724                None,
725            ),
726        )
727        .await
728    }
729
730    /// 1. Check if should collate master
731    /// 2. And schedule master block collation
732    /// 3. Or schedule next collation attempt in current shard
733    async fn run_next_collation_step(
734        &self,
735        prev_mc_block_id: &BlockId,
736        shard_id: ShardIdent,
737        trigger_shard_block_id_opt: Option<BlockId>,
738        ctx: DetectNextCollationStepContext,
739    ) -> Result<()> {
740        let next_step = Self::detect_next_collation_step(
741            &mut self.collation_sync_state.lock(),
742            self.active_collation_sessions
743                .read()
744                .keys()
745                .cloned()
746                .collect(),
747            shard_id,
748            ctx,
749        );
750
751        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
752            next_step = ?next_step,
753            "detected next collation step",
754        );
755
756        match next_step {
757            NextCollationStep::CollateMaster(next_mc_block_chain_time) => {
758                if !shard_id.is_masterchain() {
759                    // shard collator will wait and master collator will work
760                    self.set_collator_state(&shard_id, |ac| ac.state = CollatorState::Waiting);
761                }
762
763                self.set_collator_state(&ShardIdent::MASTERCHAIN, |ac| {
764                    ac.state = CollatorState::Active;
765                });
766
767                self.enqueue_mc_block_collation(
768                    prev_mc_block_id.get_next_id_short(),
769                    next_mc_block_chain_time,
770                    trigger_shard_block_id_opt,
771                )
772                .await?;
773            }
774            NextCollationStep::WaitForMasterStatus | NextCollationStep::WaitForShardStatus => {
775                // current shard collator will wait
776                self.set_collator_state(&shard_id, |ac| ac.state = CollatorState::Waiting);
777            }
778            NextCollationStep::ResumeAttemptsIn(shards_to_resume_attempts) => {
779                // if should not collate master block
780                // then continue collation by running `try_collate` that will
781                // - in masterchain: just import next anchor
782                // - in workchain: run next attempt to collate shard block
783                let mut current_shard_should_wait = true;
784                for shard_ident in shards_to_resume_attempts {
785                    if shard_ident == shard_id {
786                        current_shard_should_wait = false;
787                    }
788                    self.set_collator_state(&shard_ident, |ac| ac.state = CollatorState::Active);
789                    self.enqueue_try_collate(&shard_ident).await?;
790                }
791                if current_shard_should_wait {
792                    self.set_collator_state(&shard_id, |ac| ac.state = CollatorState::Waiting);
793                }
794            }
795        }
796
797        Ok(())
798    }
799
800    /// Process collated block candidate
801    /// 1. Save block to cache
802    /// 2. Spawn block validation
803    /// 3. Check if the master block interval elapsed (according to chain time) and schedule collation
804    /// 4. If master block then update last master block chain time
805    /// 5. Notify mempool about new master block (it may perform gc or nodes rotation)
806    /// 6. Execute master block processing routines
807    #[tracing::instrument(
808        skip_all,
809        fields(block_id = %collation_result.candidate.block.id().as_short_id()),
810    )]
811    pub async fn handle_collated_block_candidate(
812        self: &Arc<Self>,
813        collation_result: BlockCollationResult,
814    ) -> Result<()> {
815        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
816            ct = collation_result.candidate.chain_time,
817            processed_to_anchor_id = collation_result.candidate.processed_to_anchor_id,
818            "start processing block candidate",
819        );
820
821        let _histogram =
822            HistogramGuard::begin("tycho_collator_handle_collated_block_candidate_time");
823
824        let block_id = *collation_result.candidate.block.id();
825        let candidate_chain_time = collation_result.candidate.chain_time;
826        let consensus_config_changed = collation_result.candidate.consensus_config_changed;
827
828        debug_assert_eq!(
829            block_id.is_masterchain(),
830            collation_result.mc_data.is_some(),
831        );
832
833        // sync cache and collator state access
834        self.ready_to_sync.notified().await;
835        scopeguard::defer!(self.ready_to_sync.notify_one());
836
837        // find collation session related to this block by session id
838        let session_info = match self
839            .active_collation_sessions
840            .read()
841            .get(&block_id.shard)
842            .cloned()
843        {
844            Some(session_info) if session_info.id() == collation_result.collation_session_id => {
845                session_info
846            }
847            _ => {
848                // otherwise skip block because we have synced to a newer mc block
849                // and found out that we should not collated at all
850                tracing::warn!(
851                    target: tracing_targets::COLLATION_MANAGER,
852                    "there is no active session related to collated block. Skipped",
853                );
854
855                self.set_collator_state(&block_id.shard, |ac| ac.state = CollatorState::Waiting);
856
857                return Ok(());
858            }
859        };
860
861        // cancel collator now after produced block if cancel was requested during active collation
862        let updated_collator_state = self.set_collator_state(&block_id.shard, |ac| {
863            if ac.state == CollatorState::CancelPending {
864                ac.state = CollatorState::Cancelled;
865            }
866        });
867        let collator_cancelled = updated_collator_state == Some(CollatorState::Cancelled);
868
869        let store_res = if collator_cancelled {
870            tracing::info!(target: tracing_targets::COLLATION_MANAGER,
871                shard_id = %block_id.shard,
872                "collator was cancelled before",
873            );
874
875            // If collator was cancelled then should clear uncommitted queue diffs.
876            // E.g. the queue diff from just collated block is uncommitted at this point,
877            // so it will be deleted because we do not need this just collated block.
878            let top_shards = self.blocks_cache.get_last_top_shards();
879            self.mq_adapter.clear_uncommitted_state(&top_shards)?;
880
881            // we do not save just collated block,
882            // we take last existed info from cache to detect the next step
883            let (last_collated_mc_block_id, applied_mc_queue_range) = self
884                .blocks_cache
885                .get_last_collated_block_and_applied_mc_queue_range();
886
887            let store_res = BlockCacheStoreResult {
888                received_and_collated: false,
889                block_mismatch: false,
890                last_collated_mc_block_id,
891                applied_mc_queue_range,
892            };
893
894            tracing::info!(
895                target: tracing_targets::COLLATION_MANAGER,
896                ?store_res,
897                "block candidate was not saved to cache",
898            );
899
900            store_res
901        } else {
902            // if collator ws not cancelled then we consider that just collated block
903            // should be correct and try store it to the cache
904            let top_shard_blocks_info = collation_result
905                .mc_data
906                .as_ref()
907                .map(|mc_data| {
908                    mc_data
909                        .shards
910                        .iter()
911                        .map(|(shard_id, shard_descr)| TopBlockIdUpdated {
912                            block: TopBlockId {
913                                ref_by_mc_seqno: shard_descr.reg_mc_seqno,
914                                block_id: shard_descr.get_block_id(*shard_id),
915                            },
916                            updated: shard_descr.top_sc_block_updated,
917                        })
918                        .collect::<Vec<_>>()
919                })
920                .unwrap_or_default();
921            let top_processed_to_anchor = collation_result
922                .mc_data
923                .as_ref()
924                .map(|mc_data| mc_data.top_processed_to_anchor);
925            let store_res = self.blocks_cache.store_collated(
926                collation_result.candidate,
927                top_shard_blocks_info,
928                top_processed_to_anchor,
929            )?;
930
931            if store_res.block_mismatch {
932                let labels = [("workchain", block_id.shard.workchain().to_string())];
933                metrics::counter!("tycho_collator_block_mismatch_count", &labels).increment(1);
934
935                self.set_collator_state(&block_id.shard, |ac| ac.state = CollatorState::Cancelled);
936
937                // when master block mismatched then should cancel shard collators as well
938                if block_id.is_masterchain() {
939                    for mut ac in self
940                        .active_collators
941                        .iter_mut()
942                        .filter(|ac| ac.key() != &block_id.shard)
943                    {
944                        // now we cannot cancel active collation directly
945                        // so we mark collators to be cancelled when they finish current active collation
946                        ac.state = match ac.state {
947                            CollatorState::Waiting | CollatorState::Cancelled => {
948                                CollatorState::Cancelled
949                            }
950                            _ => CollatorState::CancelPending,
951                        };
952                    }
953                }
954
955                // we clear uncommitted queue diffs, including one from just collated block
956                // because it is not correct
957                let top_shards = self.blocks_cache.get_last_top_shards();
958                self.mq_adapter.clear_uncommitted_state(&top_shards)?;
959
960                tracing::info!(
961                    target: tracing_targets::COLLATION_MANAGER,
962                    ?store_res,
963                    "saved block candidate to cache",
964                );
965            } else {
966                tracing::debug!(
967                    target: tracing_targets::COLLATION_MANAGER,
968                    ?store_res,
969                    "saved block candidate to cache",
970                );
971            }
972
973            store_res
974        };
975
976        let collation_cancelled = collator_cancelled || store_res.block_mismatch;
977
978        // check if should sync to last applied mc block
979        let should_sync_to_last_applied_mc_block = 'check_should_sync: {
980            // do not sync to last applied mc block if newer already received
981            if let Some((_, applied_range_end)) = store_res.applied_mc_queue_range {
982                let last_received_mc_block_seqno = self.get_last_received_mc_block_seqno();
983                if matches!(
984                    last_received_mc_block_seqno,
985                    Some(last_received_seqno) if last_received_seqno.saturating_sub(applied_range_end) > 1
986                ) {
987                    tracing::info!(target: tracing_targets::COLLATION_MANAGER,
988                        last_received_mc_block_seqno,
989                        applied_range_end,
990                        "check_should_sync: should not sync to last applied mc block \
991                        because a newer one already received",
992                    );
993                    break 'check_should_sync false;
994                }
995            }
996
997            if collation_cancelled {
998                true
999            } else if let Some((_, applied_range_end)) = store_res.applied_mc_queue_range {
1000                if let Some(last_collated_mc_block_id) = store_res.last_collated_mc_block_id {
1001                    let applied_range_end_delta =
1002                        applied_range_end.saturating_sub(last_collated_mc_block_id.seqno);
1003                    if applied_range_end_delta < self.config.min_mc_block_delta_from_bc_to_sync {
1004                        // should collate next own mc block because last applied is not far ahead
1005                        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1006                            "check_should_sync: should collate next own mc block: \
1007                            last applied ({}) ahead last collated ({}) on {} < {}",
1008                            applied_range_end, last_collated_mc_block_id.seqno,
1009                            applied_range_end_delta, self.config.min_mc_block_delta_from_bc_to_sync,
1010                        );
1011                        false
1012                    } else {
1013                        // should sync to last applied mc block from bc because it is far ahead
1014                        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1015                            "check_should_sync: should sync to last applied mc block from bc: \
1016                            last applied ({}) ahead last collated ({}) on {} >= {}",
1017                            applied_range_end, last_collated_mc_block_id.seqno,
1018                            applied_range_end_delta, self.config.min_mc_block_delta_from_bc_to_sync,
1019                        );
1020                        true
1021                    }
1022                } else {
1023                    // should sync to last applied mc block from bc when last collated not exist
1024                    tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1025                        "check_should_sync: should sync to last applied mc block from bc: \
1026                        last applied ({}) and last collated not exist",
1027                        applied_range_end,
1028                    );
1029                    true
1030                }
1031            } else {
1032                // should collate next own mc block because no applied ahead
1033                tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1034                    "check_should_sync: should collate next own mc block after because nothing applied ahead",
1035                );
1036                false
1037            }
1038        };
1039
1040        // run sync or process just collated block
1041        if should_sync_to_last_applied_mc_block {
1042            // NOTE: we do not need to commit current master block candidate
1043            //      if it is "received and collated" because it is already applied to state
1044            //      and we do not need to notify state update and top processed to anchor
1045            //      because we will do this for block wich we sync to
1046
1047            if block_id.is_masterchain() {
1048                // run sync if have applied mc blocks
1049                self.sync_to_applied_mc_block_if_exist(
1050                    store_res.last_collated_mc_block_id,
1051                    store_res.applied_mc_queue_range,
1052                )
1053                .await?;
1054            } else {
1055                // cancel further collation of blocks in the current shard because we need to sync
1056                tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1057                    "sync_to_applied_mc_block: mark shard collator Cancelled for shard",
1058                );
1059
1060                self.set_collator_state(&block_id.shard, |ac| ac.state = CollatorState::Cancelled);
1061
1062                // run sync if all collators cancelled, or waiting, or there are no collators
1063                // and we have applied mc blocks
1064                let has_active = self.active_collators.iter().any(|ac| {
1065                    ac.state == CollatorState::Active || ac.state == CollatorState::CancelPending
1066                });
1067                if !has_active {
1068                    tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1069                        "sync_to_applied_mc_block: no active collators in shards and master, \
1070                        will run sync to last applied mc block",
1071                    );
1072
1073                    // run sync if have applied mc blocks
1074                    self.sync_to_applied_mc_block_if_exist(
1075                        store_res.last_collated_mc_block_id,
1076                        store_res.applied_mc_queue_range,
1077                    )
1078                    .await?;
1079                }
1080            }
1081        } else if block_id.is_masterchain() {
1082            // when candidate is master
1083
1084            // if consensus config was changed we should wait until master block is validated
1085            if consensus_config_changed == Some(true) {
1086                let mut delayed_mc_state_update = self.delayed_mc_state_update.lock();
1087                *delayed_mc_state_update = collation_result.mc_data.clone();
1088            } else {
1089                // otherwise we can notify state update to mempool right now
1090                self.notify_mc_state_update_to_mempool(collation_result.mc_data.clone().unwrap())
1091                    .await?;
1092            }
1093
1094            // process validation
1095            if store_res.received_and_collated {
1096                // NOTE: here commit will not cause on_block_accepted event
1097                //      because block already exist in bc state
1098
1099                self.commit_valid_master_block(&block_id).await?;
1100            } else {
1101                let validator = self.validator.clone();
1102                let validation_session_id = session_info.get_validation_session_id();
1103                let dispatcher = self.dispatcher.clone();
1104                tokio::spawn(async move {
1105                    let validation_result =
1106                        validator.validate(validation_session_id, &block_id).await;
1107
1108                    match validation_result {
1109                        Ok(status) => {
1110                            _ = dispatcher
1111                                .spawn_task(method_to_async_closure!(
1112                                    handle_validated_master_block,
1113                                    block_id,
1114                                    status
1115                                ))
1116                                .await;
1117                        }
1118                        Err(e) => {
1119                            tracing::error!(target: tracing_targets::COLLATION_MANAGER,
1120                                "block candidate validation failed: {e:?}",
1121                            );
1122                        }
1123                    }
1124                });
1125            }
1126
1127            // if consensus config was not changed execute master state update processing routines right now
1128            if consensus_config_changed != Some(true) {
1129                self.process_mc_state_update(
1130                    collation_result.mc_data.unwrap(),
1131                    ProcessMcStateUpdateMode::StartCollation {
1132                        reset_collators: false,
1133                    },
1134                )
1135                .await?;
1136            }
1137        } else {
1138            // when candidate is shard
1139
1140            // run master block collation if required or resume collation attempts in shard
1141            tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1142                "will run next collation step",
1143            );
1144
1145            self.run_next_collation_step(
1146                &collation_result.prev_mc_block_id,
1147                block_id.shard,
1148                Some(block_id),
1149                DetectNextCollationStepContext::new(
1150                    candidate_chain_time,
1151                    collation_result.force_next_mc_block,
1152                    collation_result.collation_config.mc_block_min_interval_ms as _,
1153                    collation_result.collation_config.mc_block_max_interval_ms as _,
1154                    Some(CollatedBlockInfo::new(
1155                        collation_result.prev_mc_block_id.seqno,
1156                        collation_result.has_processed_externals,
1157                    )),
1158                ),
1159            )
1160            .await?;
1161        }
1162
1163        Ok(())
1164    }
1165
1166    /// Finish active sync if it is not finished yet
1167    /// and enqueue received block
1168    #[tracing::instrument(skip_all, fields(block_id = %ctx.state.block_id().as_short_id()))]
1169    pub async fn enqueue_handle_block_from_bc(&self, ctx: HandledBlockFromBcCtx) -> Result<()> {
1170        // TODO: Needs to redesign the task management logic.
1171        //      Current implementation with strange semaphores
1172        //      is unclear and may be confusing.
1173
1174        tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
1175            "cancel sync: enqueue_handle_block_from_bc: started",
1176        );
1177
1178        let labels = [
1179            (
1180                "workchain",
1181                ctx.state.block_id().shard.workchain().to_string(),
1182            ),
1183            ("src", "01_received".to_string()),
1184        ];
1185        metrics::gauge!("tycho_last_block_seqno", &labels).set(ctx.state.block_id().seqno);
1186
1187        self.update_last_received_mc_block_seqno(ctx.state.block_id());
1188
1189        // try to finish active sync
1190        self.finish_active_sync_to_applied(ctx.state.block_id());
1191
1192        // enqueue received block from processing
1193        self.blocks_from_bc_queue_sender.send(ctx).await?;
1194
1195        Ok(())
1196    }
1197
1198    /// Process the queue of received blocks from bc
1199    #[tracing::instrument(skip_all)]
1200    async fn process_handle_block_from_bc_queue(
1201        self: Arc<Self>,
1202        mut blocks_from_bc_queue_receiver: tokio::sync::mpsc::Receiver<HandledBlockFromBcCtx>,
1203        cancel_task: CancellationToken,
1204    ) -> Result<()> {
1205        const BATCH_SIZE: usize = 300;
1206
1207        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1208            "started",
1209        );
1210
1211        loop {
1212            let mut batch = Vec::with_capacity(BATCH_SIZE);
1213            tokio::select! {
1214                received_count = blocks_from_bc_queue_receiver.recv_many(&mut batch, BATCH_SIZE) => {
1215                    if received_count == 0 {
1216                        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1217                            "channel closed",
1218                        );
1219                        break;
1220                    }
1221
1222                    // find last master block in buffer
1223                    // will skip sync for all master blocks before it
1224                    let mut last_mc_block_id_opt = None;
1225                    for HandledBlockFromBcCtx { state, .. } in batch.iter().rev() {
1226                        if state.block_id().is_masterchain() {
1227                            last_mc_block_id_opt = Some(*state.block_id());
1228                        }
1229                    }
1230
1231                    for ctx in batch {
1232                        let is_last_mc_block_in_batch = matches!(
1233                            last_mc_block_id_opt, Some(last_mc_block_id) if ctx.state.block_id() == &last_mc_block_id
1234                        );
1235
1236                        // handle block from bc
1237                        self.handle_block_from_bc(ctx, is_last_mc_block_in_batch).await.map_err(|err| {
1238                            tracing::error!(target: tracing_targets::COLLATION_MANAGER,
1239                                "error handling block from bc: {err:?}",
1240                            );
1241                            err
1242                        })?;
1243                    }
1244                },
1245                _ = cancel_task.cancelled() => {
1246                    tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1247                        "cancelled",
1248                    );
1249                    break;
1250                }
1251            }
1252        }
1253
1254        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1255            "finished",
1256        );
1257
1258        Ok(())
1259    }
1260
1261    /// Process new block from blockchain:
1262    /// 1. Save block to cache
1263    /// 2. Stop block validation if needed
1264    /// 3. Commit block if it was collated first
1265    /// 4. Notify mempool about new master block
1266    /// 5. Sync to received block if it is far ahead last collated and last `synced_to`
1267    #[tracing::instrument(skip_all, fields(block_id = %ctx.state.block_id().as_short_id(), is_last_mc_block_in_batch))]
1268    async fn handle_block_from_bc(
1269        self: &Arc<Self>,
1270        ctx: HandledBlockFromBcCtx,
1271        is_last_mc_block_in_batch: bool,
1272    ) -> Result<()> {
1273        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1274            "start processing block from bc",
1275        );
1276
1277        let block_id = *ctx.state.block_id();
1278        debug_assert!(!block_id.is_masterchain() || block_id == ctx.mc_block_id);
1279
1280        let _histogram = HistogramGuard::begin("tycho_collator_handle_block_from_bc_time");
1281
1282        // sync cache and collator state access
1283        self.ready_to_sync.notified().await;
1284        scopeguard::defer!(self.ready_to_sync.notify_one());
1285
1286        let processed_upto = ProcessedUptoInfoStuff::try_from(ctx.processed_upto)?;
1287
1288        let Some(store_res) = self
1289            .blocks_cache
1290            .store_received(
1291                self.state_node_adapter.clone(),
1292                &ctx.mc_block_id,
1293                ctx.state.clone(),
1294                processed_upto,
1295            )
1296            .await?
1297        else {
1298            return Ok(());
1299        };
1300
1301        if store_res.block_mismatch {
1302            let labels = [("workchain", block_id.shard.workchain().to_string())];
1303            metrics::counter!("tycho_collator_block_mismatch_count", &labels).increment(1);
1304
1305            // now we cannot cancel active collation directly
1306            // so we mark collators to be cancelled when they finish current active collation
1307            self.set_collator_state(&block_id.shard, |ac| {
1308                ac.state = match ac.state {
1309                    CollatorState::Waiting | CollatorState::Cancelled => CollatorState::Cancelled,
1310                    _ => CollatorState::CancelPending,
1311                };
1312            });
1313
1314            // when master block mismatched then should cancel shard collators as well
1315            if block_id.is_masterchain() {
1316                for mut ac in self
1317                    .active_collators
1318                    .iter_mut()
1319                    .filter(|ac| ac.key() != &block_id.shard)
1320                {
1321                    ac.state = match ac.state {
1322                        CollatorState::Waiting | CollatorState::Cancelled => {
1323                            CollatorState::Cancelled
1324                        }
1325                        _ => CollatorState::CancelPending,
1326                    };
1327                }
1328            }
1329
1330            // When received blokc mismatches with collated one
1331            // then we should clear uncommitted queue diffs.
1332            // The queue diff from last collated master and its shard blocks
1333            // are uncommitted and will be removed because they are incorrect.
1334            let top_shards = self.blocks_cache.get_last_top_shards();
1335            self.mq_adapter.clear_uncommitted_state(&top_shards)?;
1336
1337            tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1338                ?store_res,
1339                "saved block from bc to cache",
1340            );
1341        } else {
1342            tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1343                ?store_res,
1344                "saved block from bc to cache",
1345            );
1346        }
1347
1348        if !block_id.is_masterchain() {
1349            return Ok(());
1350        }
1351
1352        // when received block is master
1353        let is_key_block = ctx.state.state_extra()?.after_key_block;
1354
1355        // stop any running validations up to this block
1356        // try to get the validation session ID from active collation sessions
1357        let session_id = self
1358            .active_collation_sessions
1359            .read()
1360            .get(&block_id.shard)
1361            .map(|session_info| session_info.get_validation_session_id());
1362
1363        self.validator
1364            .cancel_validation(&block_id.as_short_id(), session_id)?;
1365
1366        // check if should sync to last applied mc block right now
1367        let should_sync_to_last_applied_mc_block = 'check_should_sync: {
1368            // sync only last master block from batch or key block
1369            if !is_last_mc_block_in_batch && !is_key_block {
1370                break 'check_should_sync false;
1371            }
1372
1373            // do not sync to last applied mc block if newer already received
1374            if let Some((_, applied_range_end)) = store_res.applied_mc_queue_range {
1375                let last_received_mc_block_seqno = self.get_last_received_mc_block_seqno();
1376                if matches!(
1377                    last_received_mc_block_seqno,
1378                    Some(last_received_seqno) if last_received_seqno.saturating_sub(applied_range_end) > 1
1379                ) {
1380                    tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1381                        last_received_mc_block_seqno,
1382                        received_is_key_block = is_key_block,
1383                        "check_should_sync: should not sync to last applied mc block \
1384                        because a newer one already received",
1385                    );
1386                    break 'check_should_sync false;
1387                }
1388            }
1389
1390            if let Some(top_mc_block_id_for_next_collation) =
1391                self.get_top_mc_block_id_for_next_collation(store_res.last_collated_mc_block_id)
1392            {
1393                // we can sync only when we have any applied block ahead
1394                if let Some((_, applied_range_end)) = store_res.applied_mc_queue_range {
1395                    // check if should sync according to master block delta
1396                    let should_sync = {
1397                        let applied_range_end_delta = applied_range_end
1398                            .saturating_sub(top_mc_block_id_for_next_collation.seqno);
1399
1400                        // we should sync to every key block if node is not in current vset
1401                        let required_min_mc_block_delta = if is_key_block
1402                            && !self.active_collators.contains_key(&ShardIdent::MASTERCHAIN)
1403                        {
1404                            1
1405                        } else {
1406                            self.config.min_mc_block_delta_from_bc_to_sync
1407                        };
1408
1409                        if applied_range_end_delta < required_min_mc_block_delta {
1410                            tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1411                                last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()),
1412                                last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()),
1413                                last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()),
1414                                received_is_key_block = is_key_block,
1415                                "check_should_sync: should wait for next collated own mc block: \
1416                                last applied ({}) ahead of top for collation ({}) on {} < {}",
1417                                applied_range_end, top_mc_block_id_for_next_collation.seqno,
1418                                applied_range_end_delta, required_min_mc_block_delta,
1419                            );
1420                            false
1421                        } else {
1422                            tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1423                                last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()),
1424                                last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()),
1425                                last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()),
1426                                received_is_key_block = is_key_block,
1427                                "check_should_sync: should sync to last applied mc block from bc: \
1428                                last applied ({}) ahead of top for collation ({}) on {} >= {}",
1429                                applied_range_end, top_mc_block_id_for_next_collation.seqno,
1430                                applied_range_end_delta, required_min_mc_block_delta,
1431                            );
1432                            true
1433                        }
1434                    };
1435
1436                    if should_sync {
1437                        // we should sync but we can run sync right now only when there are no active collators
1438                        let mut has_active = false;
1439                        for active_collator in self.active_collators.iter().filter(|ac| {
1440                            ac.state == CollatorState::Active
1441                                || ac.state == CollatorState::CancelPending
1442                        }) {
1443                            // try to gracefully cancel active collations
1444                            active_collator.cancel_collation.notify_one();
1445                            has_active = true;
1446                        }
1447
1448                        if has_active {
1449                            tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1450                                last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()),
1451                                last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()),
1452                                last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()),
1453                                received_is_key_block = is_key_block,
1454                                "check_should_sync: cannot sync when there are active collations, \
1455                                try to gracefully cancel them",
1456                            );
1457                            false
1458                        } else {
1459                            tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1460                                last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()),
1461                                last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()),
1462                                last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()),
1463                                received_is_key_block = is_key_block,
1464                                "check_should_sync: can sync to last applied mc block \
1465                                when all collators were cancelled, or waiting, or there are no collators (node not in set)",
1466                            );
1467                            true
1468                        }
1469                    } else {
1470                        false
1471                    }
1472                } else {
1473                    // should collate next own mc block because no applied ahead
1474                    tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1475                        last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()),
1476                        last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()),
1477                        last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()),
1478                        "check_should_sync: should collate next own mc block after because nothing applied ahead",
1479                    );
1480                    false
1481                }
1482            } else {
1483                tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1484                    received_is_key_block = is_key_block,
1485                    "should sync to last applied mc block when no last collated or prev sync to",
1486                );
1487                true
1488            }
1489        };
1490
1491        if should_sync_to_last_applied_mc_block {
1492            // run sync if have applied mc blocks
1493            self.sync_to_applied_mc_block_if_exist(
1494                store_res.last_collated_mc_block_id,
1495                store_res.applied_mc_queue_range,
1496            )
1497            .await?;
1498        } else {
1499            // try to commit block if it was collated first
1500            if store_res.received_and_collated {
1501                // NOTE: here commit will not cause on_block_accepted event
1502                //      because block already exist in bc state
1503
1504                // NOTE: here master block subgraph could be already extracted,
1505                //      sent to sync, and removed from cache, because validation task
1506                //      could be finished after `store_received()` but before this point.
1507
1508                self.commit_valid_master_block(&block_id).await?;
1509            }
1510        }
1511
1512        Ok(())
1513    }
1514
1515    async fn sync_to_applied_mc_block_if_exist(
1516        self: &Arc<Self>,
1517        last_collated_mc_block_id: Option<BlockId>,
1518        applied_range: Option<(BlockSeqno, BlockSeqno)>,
1519    ) -> Result<()> {
1520        if let Some(applied_range) = applied_range {
1521            let this = self.clone();
1522
1523            let res = tokio::task::spawn_blocking(move || {
1524                this.sync_to_applied_mc_block(applied_range, last_collated_mc_block_id)
1525            })
1526            .await??;
1527
1528            if !res {
1529                tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1530                    last_applied_mc_block_id = %BlockIdShort {
1531                        shard: ShardIdent::MASTERCHAIN,
1532                        seqno: applied_range.1,
1533                    },
1534                    "sync_to_applied_mc_block: unable to sync to last applied mc block, \
1535                    need to receive next blocks from bc",
1536                );
1537            }
1538        } else {
1539            tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1540                "sync_to_applied_mc_block: there is no received applied mc blocks in cache, \
1541                will wait for next blocks from bc",
1542            );
1543        }
1544        Ok(())
1545    }
1546
1547    /// Restores internals queue state,
1548    /// processes last applied mc state
1549    /// to run next blocks collation.
1550    ///
1551    /// Returns `false` if unable to
1552    /// load diff to restore queue state
1553    #[tracing::instrument(skip_all, fields(applied_range = ?applied_range))]
1554    fn sync_to_applied_mc_block(
1555        &self,
1556        applied_range: (BlockSeqno, BlockSeqno),
1557        last_collated_mc_block_id: Option<BlockId>,
1558    ) -> Result<bool> {
1559        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1560            "start sync to applied mc block",
1561        );
1562
1563        let _histogram = HistogramGuard::begin("tycho_collator_sync_to_applied_mc_block_time");
1564        metrics::counter!("tycho_collator_sync_to_applied_mc_block_count").increment(1);
1565
1566        let first_applied_mc_block_key = BlockIdShort {
1567            shard: ShardIdent::MASTERCHAIN,
1568            seqno: applied_range.0,
1569        };
1570        let last_applied_mc_block_key = BlockIdShort {
1571            shard: ShardIdent::MASTERCHAIN,
1572            seqno: applied_range.1,
1573        };
1574
1575        // store active sync info with cancellation token
1576        let cancelled = self.set_active_sync_info(last_applied_mc_block_key.seqno)?;
1577        tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
1578            "cancel sync: sync_to_applied_mc_block: set_active_sync_info for seqno={}",
1579            last_applied_mc_block_key.seqno,
1580        );
1581        scopeguard::defer!({
1582            self.clean_active_sync_info();
1583            tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
1584                "cancel sync: sync_to_applied_mc_block: active_sync_info cleaned",
1585            );
1586        });
1587
1588        // gc blocks from cache when sync finished
1589        scopeguard::defer!(self.blocks_cache.gc_prev_blocks());
1590
1591        // we need to drop uncommitted internal messages from the queue
1592        // when mempool config override has genesis higher then in the last consensus info
1593        if let Some(mp_cfg_override) = &self.mempool_config_override {
1594            let last_consesus_info = self
1595                .blocks_cache
1596                .get_consensus_info_for_mc_block(&last_applied_mc_block_key)?;
1597            if mp_cfg_override
1598                .genesis_info
1599                .overrides(&last_consesus_info.genesis_info)
1600            {
1601                tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1602                    prev_genesis_start_round = last_consesus_info.genesis_info.start_round,
1603                    prev_genesis_time_millis = last_consesus_info.genesis_info.genesis_millis,
1604                    new_genesis_start_round = mp_cfg_override.genesis_info.start_round,
1605                    new_genesis_time_millis = mp_cfg_override.genesis_info.genesis_millis,
1606                    "will drop uncommitted internal messages from queue on new genesis",
1607                );
1608
1609                // E.g. we have applied mc block MC100 and collated some shard blocks after it (SC701, SC702)
1610                // so we have uncommitted queue diffs from these shard blocks SC701, SC702.
1611                // On restart from genesis we will start to collate SC701* again because last validated
1612                // and applied mc block is MC100. But mempool will not contain all old externals used to collate
1613                // the previous version of SC701. So the new diff will mismatch the old one and node will panic.
1614                // So we need to remove all uncommitted queue diffs because we have new mismatched externals queue.
1615                let top_shards = self.blocks_cache.get_last_top_shards();
1616                self.mq_adapter.clear_uncommitted_state(&top_shards)?;
1617            }
1618        }
1619
1620        // get internals processed_to from master and all shards
1621        // for last applied master block
1622        let all_shards_processed_to_by_partitions =
1623            Self::get_all_shards_processed_to_by_partitions_for_mc_block(
1624                &last_applied_mc_block_key,
1625                &self.blocks_cache,
1626                self.state_node_adapter.clone(),
1627            )
1628            .await_blocking()?;
1629
1630        // find internals min processed_to
1631        let min_processed_to_by_shards =
1632            find_min_processed_to_by_shards(&all_shards_processed_to_by_partitions);
1633
1634        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1635            ?min_processed_to_by_shards,
1636        );
1637
1638        // find first applied mc block and tail shard blocks and get previous
1639        let before_tail_block_ids = self
1640            .blocks_cache
1641            .read_before_tail_ids_of_mc_block(&first_applied_mc_block_key)?;
1642
1643        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1644            tail_block_ids = ?before_tail_block_ids
1645                .iter()
1646                .map(|(shard_id, (id, prev_ids))| {
1647                    format!(
1648                        "({}, id={:?}, prev_ids={})",
1649                        shard_id,
1650                        id.as_ref().map(DisplayAsShortId),
1651                        DisplayBlockIdsIntoIter(prev_ids),
1652                    )
1653                }).collect::<Vec<_>>().as_slice(),
1654        );
1655
1656        // metrics - sync is running
1657        for shard in before_tail_block_ids.keys() {
1658            let labels = [("workchain", shard.workchain().to_string())];
1659            metrics::gauge!("tycho_collator_sync_is_running", &labels).set(1);
1660        }
1661
1662        // if `fast_sync` is enabled then we will skip already applied queue diffs
1663        let queue_diffs_applied_to_top_blocks = if self.config.fast_sync
1664            && let Some(applied_to_mc_block_id) =
1665                self.get_queue_diffs_applied_to_mc_block_id(last_collated_mc_block_id)?
1666        {
1667            // BACKWARD COMPATIBILITY: `last_committed_mc_block_id` will not exist in queue storage
1668            // in previous version. We will receive None and will use all required diffs to restore the queue
1669
1670            // NOTE: when the node started to sync from a persistent state with a bunch of archives after it,
1671            //      the `last_collated_mc_block_id` will be None, and `applied_to_mc_block_id` will be equal
1672            //      to the top block of the persistent state - init block. But the persistent state can be already
1673            //      removed because of the long history after it when collator starts to sync by recent blocks.
1674            //      So we will not able to read top blocks ids and will fallback the full sync.
1675
1676            // collect top blocks queue diffs already applied to
1677            Self::get_top_blocks_seqno(
1678                &applied_to_mc_block_id,
1679                &self.blocks_cache,
1680                self.state_node_adapter.clone(),
1681            )
1682            .await_blocking()?
1683        } else {
1684            None
1685        };
1686        if queue_diffs_applied_to_top_blocks.is_some() {
1687            tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1688                ?queue_diffs_applied_to_top_blocks,
1689                "will use fast sync to restore queue",
1690            );
1691        }
1692
1693        // restore queue state and return latest applied master state
1694        let queue_restore_res = Self::restore_queue(
1695            &self.blocks_cache,
1696            self.state_node_adapter.clone(),
1697            self.mq_adapter.clone(),
1698            applied_range.0,
1699            min_processed_to_by_shards,
1700            before_tail_block_ids,
1701            queue_diffs_applied_to_top_blocks.unwrap_or_default(),
1702        )
1703        .await_blocking()?;
1704
1705        let Some(last_mc_state) = queue_restore_res.last_mc_state else {
1706            return Ok(false);
1707        };
1708
1709        // process latest master state: notify mempool and refresh collation session
1710        let last_mc_block_id = *last_mc_state.block_id();
1711
1712        // HACK: do not need to set master block latest chain time from zerostate when using mempool stub
1713        //      because anchors from stub have older chain time than in zerostate and it will brake collation
1714        if last_mc_block_id.seqno > self.state_node_adapter.zerostate_id().seqno {
1715            Self::renew_mc_block_latest_chain_time(
1716                &mut self.collation_sync_state.lock(),
1717                last_mc_state.get_gen_chain_time(),
1718            );
1719        }
1720
1721        Self::reset_collation_sync_status(&mut self.collation_sync_state.lock());
1722
1723        // update last "synced to" info
1724        self.update_last_synced_to_mc_block_id(last_mc_block_id);
1725
1726        // reset top shard blocks info
1727        // because next we will start to collate new shard blocks after the sync
1728        self.blocks_cache.reset_top_shard_blocks_additional_info();
1729
1730        let mc_data = self.build_mc_data(
1731            last_mc_state,
1732            queue_restore_res.prev_mc_state,
1733            queue_restore_res.prev_mc_block_id,
1734            all_shards_processed_to_by_partitions,
1735        )?;
1736
1737        self.blocks_cache
1738            .remove_next_collated_blocks_from_cache(&queue_restore_res.synced_to_blocks_keys);
1739
1740        // notify state update to mempool
1741        self.notify_mc_state_update_to_mempool(mc_data.clone())
1742            .await_blocking()?;
1743
1744        // when sync cancelled we do not exist sync but skip collation
1745        let process_state_update_mode = match cancelled.is_cancelled() {
1746            true => ProcessMcStateUpdateMode::SkipCollation,
1747            false => ProcessMcStateUpdateMode::StartCollation {
1748                reset_collators: true,
1749            },
1750        };
1751
1752        self.process_mc_state_update(mc_data.clone(), process_state_update_mode)
1753            .await_blocking()?;
1754
1755        // handle top processed to anchor in mempool
1756        self.notify_top_processed_to_anchor_to_mempool(
1757            mc_data.block_id.seqno,
1758            mc_data.top_processed_to_anchor,
1759        )
1760        .await_blocking()?;
1761
1762        // report last "synced to" blocks to metrics
1763        for synced_to_block_id in queue_restore_res.synced_to_blocks_keys {
1764            let labels = [
1765                (
1766                    "workchain",
1767                    synced_to_block_id.shard.workchain().to_string(),
1768                ),
1769                ("src", "02_synced_to".to_string()),
1770            ];
1771            metrics::gauge!("tycho_last_block_seqno", &labels).set(synced_to_block_id.seqno);
1772        }
1773
1774        Ok(true)
1775    }
1776
1777    /// Builds `McData` from provided parts.
1778    /// Tries to load the previous mc state from storage when passed `None`.
1779    fn build_mc_data(
1780        &self,
1781        last_mc_state: ShardStateStuff,
1782        prev_mc_state: Option<ShardStateStuff>,
1783        prev_mc_block_id: Option<BlockId>,
1784        all_shards_processed_to_by_partitions: FastHashMap<
1785            ShardIdent,
1786            (bool, ProcessedToByPartitions),
1787        >,
1788    ) -> Result<Arc<McData>> {
1789        let prev_mc_state = match (prev_mc_state, prev_mc_block_id) {
1790            (Some(mc_state), _) => Some(mc_state),
1791            (None, None) => None,
1792            (None, Some(prev_mc_block_id)) if prev_mc_block_id.seqno <= self.state_node_adapter.zerostate_id().seqno => None,
1793            // NOTE: Use zero epoch here since we don't need to reuse these states.
1794            (None, Some(prev_mc_block_id)) => self
1795                .state_node_adapter
1796                .load_state(0, &prev_mc_block_id, Default::default())
1797                .await_blocking()
1798                .map_err(|err| {
1799                    tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
1800                        prev_mc_block_id = %prev_mc_block_id.as_short_id(),
1801                        error = ?err,
1802                        "failed to load previous mc state to build mc data, continue without prev_mc_data",
1803                    );
1804                    err
1805                }).ok(),
1806        };
1807
1808        McData::load_from_state(
1809            &last_mc_state,
1810            prev_mc_state.as_ref(),
1811            all_shards_processed_to_by_partitions,
1812        )
1813    }
1814
1815    async fn get_all_shards_processed_to_by_partitions_for_mc_block(
1816        mc_block_key: &BlockCacheKey,
1817        blocks_cache: &BlocksCache,
1818        state_node_adapter: Arc<dyn StateNodeAdapter>,
1819    ) -> Result<FastHashMap<ShardIdent, (bool, ProcessedToByPartitions)>> {
1820        let mut result = FastHashMap::default();
1821
1822        let zerostate_mc_seqno = blocks_cache.zerostate_mc_seqno();
1823        if mc_block_key.seqno <= zerostate_mc_seqno {
1824            return Ok(result);
1825        }
1826
1827        let from_cache = blocks_cache.get_top_blocks_processed_to_by_partitions(mc_block_key)?;
1828
1829        for (top_block_id, item) in from_cache {
1830            let processed_to = match item.by {
1831                Some(processed_to) => processed_to,
1832                None => {
1833                    if item.ref_by_mc_seqno <= zerostate_mc_seqno {
1834                        FastHashMap::default()
1835                    } else {
1836                        // get from state
1837                        let state = state_node_adapter
1838                            .load_state(mc_block_key.seqno, &top_block_id, LoadStateHint {
1839                                // State must already be applied at this point.
1840                                allow_ignore_direct: false,
1841                            })
1842                            .await?;
1843                        let processed_upto = state.state().processed_upto.load()?;
1844                        let processed_upto = ProcessedUptoInfoStuff::try_from(processed_upto)?;
1845                        processed_upto.get_internals_processed_to_by_partitions()
1846                    }
1847                }
1848            };
1849
1850            result.insert(top_block_id.shard, (item.updated, processed_to));
1851        }
1852
1853        Ok(result)
1854    }
1855
1856    // Returns top master block id upto which all queue diffs applied
1857    fn get_queue_diffs_applied_to_mc_block_id(
1858        &self,
1859        last_collated_mc_block_id: Option<BlockId>,
1860    ) -> Result<Option<BlockId>> {
1861        let last_queue_comitted_on = self.mq_adapter.get_last_committed_mc_block_id()?;
1862        let last_collated_or_synced_to =
1863            self.get_top_mc_block_id_for_next_collation(last_collated_mc_block_id);
1864
1865        let mc_block_id = match (last_queue_comitted_on, last_collated_or_synced_to) {
1866            (Some(last_queue_comitted_on), Some(last_collated_or_synced_to)) => {
1867                // return last collated if it exists (or last "synced to")
1868                // if above mc block on which the queue was committed
1869                if last_collated_or_synced_to.seqno >= last_queue_comitted_on.seqno {
1870                    Some(last_collated_or_synced_to)
1871                } else {
1872                    Some(last_queue_comitted_on)
1873                }
1874            }
1875            (Some(mc_block_id), _) | (_, Some(mc_block_id)) => Some(mc_block_id),
1876            _ => None,
1877        };
1878
1879        Ok(mc_block_id)
1880    }
1881
1882    /// Return last collated if it is ahead of last received and "synced to".
1883    /// Or last received and "synced to" if it is ahead of last correct collated.
1884    /// Last collated may be incorrect but we don not know this until we receive block from bc.
1885    /// We use this to detect the lag from last received to check if we need to run next sync.
1886    #[tracing::instrument(skip_all)]
1887    fn get_top_mc_block_id_for_next_collation(
1888        &self,
1889        last_collated_mc_block_id: Option<BlockId>,
1890    ) -> Option<BlockId> {
1891        let last_synced_to_mc_block_id = self.get_last_synced_to_mc_block_id();
1892
1893        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1894            ?last_synced_to_mc_block_id,
1895            ?last_collated_mc_block_id,
1896        );
1897
1898        match (last_synced_to_mc_block_id, last_collated_mc_block_id) {
1899            (Some(last_synced_to), Some(last_collated)) => {
1900                if last_synced_to.seqno > last_collated.seqno {
1901                    Some(last_synced_to)
1902                } else {
1903                    Some(last_collated)
1904                }
1905            }
1906            (Some(mc_block_id), _) | (_, Some(mc_block_id)) => Some(mc_block_id),
1907            _ => None,
1908        }
1909    }
1910
1911    #[tracing::instrument(skip_all, fields(from_mc_block_seqno))]
1912    async fn restore_queue(
1913        blocks_cache: &BlocksCache,
1914        state_node_adapter: Arc<dyn StateNodeAdapter>,
1915        mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
1916        from_mc_block_seqno: u32,
1917        min_processed_to_by_shards: BTreeMap<ShardIdent, QueueKey>,
1918        before_tail_block_ids: BTreeMap<ShardIdent, (Option<BlockId>, Vec<BlockId>)>,
1919        queue_diffs_applied_to_top_blocks: FastHashMap<ShardIdent, u32>,
1920    ) -> Result<RestoreQueueResult> {
1921        let mut res = RestoreQueueResult::default();
1922
1923        // load init block (from persistent state) to check if required diff was already applied from persistent
1924        let init_mc_block_id = state_node_adapter.load_init_block_id();
1925        let mut init_mc_block_reached_on = FastHashMap::new();
1926
1927        // try load required previous queue diffs
1928        let mut first_required_diffs = FastHashMap::new();
1929        for (shard_id, min_processed_to) in &min_processed_to_by_shards {
1930            let mut prev_queue_diffs = vec![];
1931            let Some((_, prev_block_ids)) = before_tail_block_ids.get(shard_id) else {
1932                continue;
1933            };
1934            let mut prev_block_ids: VecDeque<_> = prev_block_ids.iter().cloned().collect();
1935
1936            while let Some(prev_block_id) = prev_block_ids.pop_front() {
1937                // NOTE: We don't skip prev block ids for shard zerostates because
1938                // it is quite hard to propagate `ref_by_mc_seqno` here (we construct
1939                // prev ids based just on `BlockId` here). There seems to be no problems
1940                // with that because we are checking `init_mc_block_reached` using
1941                // the handle data so zerostate ids will be skipped in any case.
1942                // This check is just to not change the old behavior just in case.
1943                if prev_block_id.seqno == 0
1944                    || prev_block_id.is_masterchain()
1945                        && prev_block_id.seqno <= state_node_adapter.zerostate_id().seqno
1946                {
1947                    continue;
1948                }
1949
1950                // if diff is below top applied then skip
1951                if let Some(border) = queue_diffs_applied_to_top_blocks.get(shard_id)
1952                    && prev_block_id.seqno <= *border
1953                {
1954                    tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1955                        prev_block_id = %prev_block_id.as_short_id(),
1956                        top_applied_seqno = border,
1957                        "previous queue diff skipped because it below top applied",
1958                    );
1959                    continue;
1960                }
1961
1962                // if diff is before init block (from persistent state)
1963                // we do not need to apply it because queue was already restored from persistent
1964                if let Some(init_mc_block_id) = init_mc_block_id {
1965                    let mut skip_diff = false;
1966                    if prev_block_id.is_masterchain() {
1967                        if prev_block_id.seqno <= init_mc_block_id.seqno {
1968                            tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1969                                prev_block_id = %prev_block_id.as_short_id(),
1970                                init_mc_block_id = %init_mc_block_id.as_short_id(),
1971                                "master block queue diff apply skipped because it is below init block from persistent state",
1972                            );
1973                            skip_diff = true;
1974                        }
1975                    } else {
1976                        // check if we already reached init mc block before
1977                        let mut init_mc_block_reached = matches!(
1978                            init_mc_block_reached_on.get(&prev_block_id.shard),
1979                            Some(reached_seqno) if prev_block_id.seqno <= *reached_seqno,
1980                        );
1981                        if !init_mc_block_reached {
1982                            // for shard block we should check it's `ref_by_mc_seqno`
1983                            let prev_ref_by_mc_seqno = state_node_adapter
1984                                .get_ref_by_mc_seqno(&prev_block_id)
1985                                .await?
1986                                .unwrap();
1987                            init_mc_block_reached = prev_ref_by_mc_seqno <= init_mc_block_id.seqno;
1988                            if init_mc_block_reached {
1989                                init_mc_block_reached_on
1990                                    .insert(prev_block_id.shard, prev_block_id.seqno);
1991                            }
1992                        }
1993                        if init_mc_block_reached {
1994                            tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1995                                prev_block_id = %prev_block_id.as_short_id(),
1996                                init_mc_block_id = %init_mc_block_id.as_short_id(),
1997                                "shard block queue diff apply skipped because it is below init block from persistent state",
1998                            );
1999                            skip_diff = true;
2000                        }
2001                    }
2002                    if skip_diff {
2003                        // if current diff is below init block
2004                        // then we should check sequense for each next diff
2005                        first_required_diffs.insert(prev_block_id.shard, BlockId::default());
2006                        continue;
2007                    }
2008                }
2009
2010                // load diff to check if it is required
2011                let Some(queue_diff_stuff) = state_node_adapter.load_diff(&prev_block_id).await?
2012                else {
2013                    tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
2014                        prev_block_id = %prev_block_id,
2015                        "unable to load prev diff to sync queue state, cancel sync",
2016                    );
2017
2018                    // metrics - sync finished
2019                    for shard in before_tail_block_ids.keys() {
2020                        let labels = [("workchain", shard.workchain().to_string())];
2021                        metrics::gauge!("tycho_collator_sync_is_running", &labels).set(0);
2022                    }
2023
2024                    return Ok(res);
2025                };
2026                let diff_required = &queue_diff_stuff.as_ref().max_message > min_processed_to;
2027                tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
2028                    diff_block_id = %prev_block_id.as_short_id(),
2029                    diff_required,
2030                    max_message = %queue_diff_stuff.as_ref().max_message,
2031                    min_processed_to = %min_processed_to,
2032                    "check if diff required to restore queue working state on sync:",
2033                );
2034                if diff_required {
2035                    // if next diff is not required
2036                    // then current will be the first required
2037                    first_required_diffs.insert(prev_block_id.shard, prev_block_id);
2038
2039                    let block_stuff = state_node_adapter
2040                        .load_block(&prev_block_id)
2041                        .await?
2042                        .unwrap();
2043                    let out_msgs = block_stuff.load_extra()?.out_msg_description.load()?;
2044
2045                    let queue_diff_with_messages =
2046                        QueueDiffWithMessages::from_queue_diff(&queue_diff_stuff, &out_msgs)?;
2047
2048                    prev_queue_diffs.push((
2049                        queue_diff_with_messages,
2050                        *queue_diff_stuff.diff_hash(),
2051                        prev_block_id,
2052                        queue_diff_stuff.as_ref().min_message,
2053                        queue_diff_stuff.as_ref().max_message,
2054                    ));
2055
2056                    let prev_ids_info = block_stuff.construct_prev_id()?;
2057                    prev_block_ids.push_back(prev_ids_info.0);
2058                    if let Some(id) = prev_ids_info.1 {
2059                        prev_block_ids.push_back(id);
2060                    }
2061                }
2062            }
2063
2064            // apply required previous queue diffs for each shard
2065            while let Some((diff, diff_hash, block_id, min_message, max_message)) =
2066                prev_queue_diffs.pop()
2067            {
2068                let statistics =
2069                    DiffStatistics::from_diff(&diff, block_id.shard, min_message, max_message);
2070
2071                // we can skip the sequense check for the first required diff only
2072                let check_sequence = match first_required_diffs.get(&block_id.shard) {
2073                    Some(id) if *id == block_id => None,
2074                    _ => Some(DiffZone::Both),
2075                };
2076
2077                mq_adapter
2078                    .apply_diff(
2079                        diff,
2080                        block_id.as_short_id(),
2081                        &diff_hash,
2082                        statistics,
2083                        check_sequence,
2084                    )
2085                    .context("sync_to_applied_mc_block")?;
2086
2087                res.applied_diffs_ids.insert(block_id);
2088            }
2089        }
2090
2091        // will track last mc state and previous before it
2092        let mut prev_mc_state = None;
2093        let mut last_mc_state;
2094
2095        // extract all recevied blocks, apply required diffs
2096        // and return latest master state
2097        loop {
2098            // pop first applied mc block and sync
2099            // actually we can sync more mc blocks than known in applied_range
2100            // because we can receive new blocks from bc during sync
2101            let (mc_block_subgraph_extract, is_last) =
2102                blocks_cache.pop_front_applied_mc_block_subgraph(from_mc_block_seqno)?;
2103
2104            let subgraph = match mc_block_subgraph_extract {
2105                McBlockSubgraphExtract::Extracted(subgraph) => subgraph,
2106                McBlockSubgraphExtract::AlreadyExtracted => {
2107                    bail!("mc block subgraph extract result cannot be AlreadyExtracted")
2108                }
2109            };
2110
2111            let mc_block_entry = &subgraph.master_block;
2112
2113            // apply queue diffs from blocks above zerostate seqno
2114            // skip cached diffs below min_processed_to
2115            if subgraph.master_block.block_id.seqno > state_node_adapter.zerostate_id().seqno {
2116                for block_entry in [mc_block_entry]
2117                    .into_iter()
2118                    .chain(subgraph.shard_blocks.iter())
2119                {
2120                    // if diff is below top applied then skip
2121                    if let Some(border) =
2122                        queue_diffs_applied_to_top_blocks.get(&block_entry.block_id.shard)
2123                        && block_entry.block_id.seqno <= *border
2124                    {
2125                        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
2126                            received_block_id = %block_entry.block_id.as_short_id(),
2127                            top_applied_seqno = border,
2128                            "queue diff apply skipped because it is below top applied",
2129                        );
2130                        continue;
2131                    }
2132
2133                    let min_processed_to =
2134                        min_processed_to_by_shards.get(&block_entry.block_id.shard);
2135
2136                    if let Some(applied_diff_block_id) =
2137                        Self::apply_block_queue_diff_from_entry_stuff(
2138                            state_node_adapter.as_ref(),
2139                            mq_adapter.clone(),
2140                            block_entry,
2141                            min_processed_to,
2142                            &mut first_required_diffs,
2143                        )?
2144                    {
2145                        res.applied_diffs_ids.insert(applied_diff_block_id);
2146                    }
2147                }
2148            }
2149
2150            // we can gc to current master block when diffs were applied
2151            let to_blocks_keys = mc_block_entry.get_top_blocks_keys()?;
2152            blocks_cache.set_gc_to_boundary(&to_blocks_keys);
2153
2154            last_mc_state = mc_block_entry.cached_state()?.clone();
2155
2156            // on sync finish we commit diffs
2157            if is_last {
2158                let partitions = subgraph.get_partitions();
2159                Self::commit_block_queue_diff(
2160                    mq_adapter.clone(),
2161                    &mc_block_entry.block_id,
2162                    &mc_block_entry.top_shard_blocks_info,
2163                    &partitions,
2164                )?;
2165
2166                // when we run sync by any reason we should drop uncommitted queue updates
2167                // after restoring the required state
2168                // to avoid panics if next block was already collated before an it is incorrect
2169                let top_shards = blocks_cache.get_last_top_shards();
2170                mq_adapter.clear_uncommitted_state(&top_shards)?;
2171
2172                res.last_mc_state = Some(last_mc_state);
2173                res.prev_mc_state = prev_mc_state;
2174                res.prev_mc_block_id = mc_block_entry.prev_blocks_ids.first().copied();
2175                res.synced_to_blocks_keys.extend(to_blocks_keys.into_iter());
2176
2177                return Ok(res);
2178            }
2179
2180            prev_mc_state = Some(last_mc_state.clone());
2181        }
2182    }
2183
2184    fn get_last_processed_mc_block_id(&self) -> Option<BlockId> {
2185        *self.last_processed_mc_block_id.lock()
2186    }
2187
2188    fn check_should_process_and_update_last_processed_mc_block(&self, block_id: &BlockId) -> bool {
2189        let mut guard = self.last_processed_mc_block_id.lock();
2190        let last_processed_mc_block_id_opt = guard.as_ref();
2191        let (seqno_delta, is_equal) =
2192            Self::compare_mc_block_with(block_id, last_processed_mc_block_id_opt);
2193        if seqno_delta < 0 || is_equal {
2194            false
2195        } else {
2196            guard.replace(*block_id);
2197            true
2198        }
2199    }
2200
2201    /// Returns: (seqno delta from other, true - if equal).
2202    /// If `other_mc_block_id_opt` is none, returns : (0, false)
2203    fn compare_mc_block_with(
2204        mc_block_id: &BlockId,
2205        other_mc_block_id_opt: Option<&BlockId>,
2206    ) -> (i32, bool) {
2207        let (seqno_delta, is_equal) = match other_mc_block_id_opt {
2208            None => {
2209                tracing::info!(
2210                    target: tracing_targets::COLLATION_MANAGER,
2211                    "other mc block is None: current {} other ({:?}): is_equal = false, seqno_delta = 0",
2212                    mc_block_id.as_short_id(),
2213                    other_mc_block_id_opt.map(|b| b.as_short_id()),
2214                );
2215                (0, false)
2216            }
2217            Some(other_mc_block_id) => (
2218                mc_block_id.seqno as i32 - other_mc_block_id.seqno as i32,
2219                mc_block_id == other_mc_block_id,
2220            ),
2221        };
2222        if seqno_delta < 0 || is_equal {
2223            tracing::info!(
2224                target: tracing_targets::COLLATION_MANAGER,
2225                "mc block is NOT AHEAD of other: current {} other ({:?}): is_equal = {}, seqno_delta = {}",
2226                mc_block_id.as_short_id(),
2227                other_mc_block_id_opt.map(|b| b.as_short_id()),
2228                is_equal, seqno_delta,
2229            );
2230        }
2231        (seqno_delta, is_equal)
2232    }
2233
2234    async fn process_mc_state_update(
2235        &self,
2236        mc_data: Arc<McData>,
2237        mode: ProcessMcStateUpdateMode,
2238    ) -> Result<()> {
2239        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
2240            ?mode,
2241            "will process master state update",
2242        );
2243
2244        if let ProcessMcStateUpdateMode::StartCollation { reset_collators } = mode {
2245            let block_global = mc_data.config.get_global_version()?;
2246            if self.config.supported_block_version >= block_global.version
2247                && block_global
2248                    .capabilities
2249                    .is_subset_of(self.config.supported_capabilities)
2250            {
2251                self.refresh_collation_sessions(mc_data, reset_collators)
2252                    .await?;
2253            } else {
2254                tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
2255                    collator_supported_block_version = self.config.supported_block_version,
2256                    mc_block_version = block_global.version,
2257                    collator_supported_capabilities = ?self.config.supported_capabilities,
2258                    mc_block_capabilities = ?block_global.capabilities,
2259                    "Refresh collation sessions is skipped: collator does not support mc block version or capabilities",
2260                );
2261            }
2262        }
2263
2264        Ok(())
2265    }
2266
2267    /// Returns top processed to anchor id if delayed state was processed
2268    async fn notify_to_mempool_and_process_delayed_mc_state_update(
2269        &self,
2270        block_id: &BlockId,
2271    ) -> Result<Option<MempoolAnchorId>> {
2272        let mut delayed_mc_data = None;
2273        {
2274            let mut guard = self.delayed_mc_state_update.lock();
2275            if let Some(mc_data) = guard.clone()
2276                && mc_data.block_id <= *block_id
2277            {
2278                // process delayed mc state only if committed block is equal
2279                if mc_data.block_id == *block_id {
2280                    delayed_mc_data = Some(mc_data);
2281                }
2282
2283                // remove delayed mc state even if committed block is ahead
2284                *guard = None;
2285            }
2286        }
2287        if let Some(mc_data) = delayed_mc_data {
2288            self.notify_mc_state_update_to_mempool(mc_data.clone())
2289                .await?;
2290            self.process_mc_state_update(
2291                mc_data.clone(),
2292                ProcessMcStateUpdateMode::StartCollation {
2293                    reset_collators: false,
2294                },
2295            )
2296            .await?;
2297
2298            Ok(Some(mc_data.top_processed_to_anchor))
2299        } else {
2300            Ok(None)
2301        }
2302    }
2303
2304    async fn notify_mc_state_update_to_mempool(&self, mc_data: Arc<McData>) -> Result<()> {
2305        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
2306            block_id = %mc_data.block_id.as_short_id(),
2307            "will notify master state update to mempool",
2308        );
2309
2310        let prev_validator_set = self
2311            .validator_set_cache
2312            .get_prev_validator_set(&mc_data.config)?;
2313        let current_validator_set = self
2314            .validator_set_cache
2315            .get_current_validator_set(&mc_data.config)?;
2316        let next_validator_set = self
2317            .validator_set_cache
2318            .get_next_validator_set(&mc_data.config)?;
2319
2320        let cx = Box::new(StateUpdateContext {
2321            mc_block_id: mc_data.block_id,
2322            mc_block_chain_time: mc_data.gen_chain_time,
2323            top_processed_to_anchor_id: mc_data.top_processed_to_anchor,
2324            consensus_info: mc_data.consensus_info,
2325            shuffle_validators: mc_data.config.get_collation_config()?.shuffle_mc_validators,
2326            consensus_config: mc_data.config.get_consensus_config()?,
2327            prev_validator_set,
2328            current_validator_set,
2329            next_validator_set,
2330        });
2331
2332        self.mpool_adapter.handle_mc_state_update(cx).await
2333    }
2334
2335    /// Get shards and validator set info from the master state,
2336    /// then start missing collation sessions, finish outdated, resume actual.
2337    /// Start/stop/resume collators and validators.
2338    #[tracing::instrument(skip_all)]
2339    async fn refresh_collation_sessions(
2340        &self,
2341        mc_data: Arc<McData>,
2342        reset_collators: bool,
2343    ) -> Result<()> {
2344        tracing::debug!(
2345            target: tracing_targets::COLLATION_MANAGER,
2346            "Start refresh collation sessions by mc state ({})...",
2347            mc_data.block_id.as_short_id(),
2348        );
2349
2350        let _histogram = HistogramGuard::begin("tycho_collator_refresh_collation_sessions_time");
2351
2352        // do not re-process this master block if it is lower then last processed or equal to it
2353        // but process a new version of block with the same seqno
2354        if !self.check_should_process_and_update_last_processed_mc_block(&mc_data.block_id) {
2355            return Ok(());
2356        }
2357
2358        tracing::trace!(target: tracing_targets::COLLATION_MANAGER, "mc_data: {:?}", mc_data);
2359
2360        // get new shards info from updated master state
2361        let mut new_shards_info = FastHashMap::default();
2362        new_shards_info.insert(ShardIdent::MASTERCHAIN, vec![mc_data.block_id]);
2363        for (shard_id, descr) in mc_data.shards.iter() {
2364            let top_block_id = descr.get_block_id(*shard_id);
2365            // TODO: consider split and merge
2366            new_shards_info.insert(*shard_id, vec![top_block_id]);
2367        }
2368
2369        // update shards in msgs queue
2370        let active_shards_ids: Vec<_> = self
2371            .active_collation_sessions
2372            .read()
2373            .keys()
2374            .cloned()
2375            .collect();
2376        let new_shards_ids: Vec<&ShardIdent> = new_shards_info.keys().collect();
2377        tracing::debug!(
2378            target: tracing_targets::COLLATION_MANAGER,
2379            "Detecting split/merge actions to move from current shards {:?} to new shards {:?}...",
2380            active_shards_ids.as_slice(),
2381            new_shards_ids
2382        );
2383
2384        let split_merge_actions = calc_split_merge_actions(&active_shards_ids, new_shards_ids)?;
2385        if !split_merge_actions.is_empty() {
2386            tracing::info!(
2387                target: tracing_targets::COLLATION_MANAGER,
2388                "Detected split/merge actions: {:?}",
2389                split_merge_actions,
2390            );
2391            // self.mq_adapter.update_shards(split_merge_actions).await?;
2392        }
2393
2394        // find out the actual collation session start round from master state
2395        let current_session_seqno = mc_data.validator_info.catchain_seqno;
2396
2397        // we need full validators set to define the subset for each session and to check if current node should collate
2398        let full_validators_set = mc_data.config.get_current_validator_set()?;
2399        tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
2400            "full_validators_set: since={}, until={}, main={}, total_weight={}, list={:?}",
2401            full_validators_set.utime_since, full_validators_set.utime_until,
2402            full_validators_set.main, full_validators_set.total_weight,
2403            DebugIter(full_validators_set.list.iter().map(|i| i.public_key)),
2404        );
2405        let collation_config = mc_data.config.get_collation_config()?;
2406        let mut subset_cache = FastHashMap::new();
2407        let mut get_validator_subset = |shard_id| match subset_cache.entry(shard_id) {
2408            hash_map::Entry::Occupied(entry) => {
2409                let (subset, hash_short): &(Arc<FastHashMap<[u8; 32], ValidatorDescription>>, u32) =
2410                    entry.get();
2411                Result::<_>::Ok((subset.clone(), *hash_short))
2412            }
2413            hash_map::Entry::Vacant(entry) => {
2414                let (subset, hash_short) = full_validators_set
2415                    .compute_mc_subset(current_session_seqno, collation_config.shuffle_mc_validators)
2416                    .ok_or_else(|| anyhow!(
2417                        "Error calculating subset of validators for session (shard_id = {}, seqno = {})",
2418                        ShardIdent::MASTERCHAIN,
2419                        current_session_seqno,
2420                    ))?;
2421
2422                let subset: FastHashMap<_, _> = subset
2423                    .into_iter()
2424                    .map(|vldr| (vldr.public_key.into(), vldr))
2425                    .collect();
2426                let subset = Arc::new(subset);
2427
2428                entry.insert((subset.clone(), hash_short));
2429                Ok((subset, hash_short))
2430            }
2431        };
2432
2433        // detect sessions and collators to start and to finish
2434        let mut sessions_to_keep = Vec::new();
2435        let mut sessions_to_start = Vec::new();
2436        let mut to_finish_sessions = Vec::new();
2437        let mut to_stop_collators = Vec::new();
2438        {
2439            let mut active_collation_sessions_guard = self.active_collation_sessions.write();
2440            let mut missed_shards_ids: FastHashSet<_> = active_shards_ids.into_iter().collect();
2441            for (shard_id, block_ids) in new_shards_info {
2442                missed_shards_ids.remove(&shard_id);
2443
2444                // check if current node is in subset
2445                let (subset, hash_short) = get_validator_subset(shard_id)?;
2446                let local_pubkey = find_us_in_collators_set(&self.keypair, &subset);
2447
2448                if local_pubkey.is_none() {
2449                    tracing::debug!(
2450                        target: tracing_targets::COLLATION_MANAGER,
2451                        public_key = %self.keypair.public_key,
2452                        current_session_seqno,
2453                        hash_short,
2454                        "Current node was not authorized to collate shard {}. Use TRACE to see subset",
2455                        shard_id,
2456                    );
2457                    tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
2458                        subset = ?DebugIter(subset.values()),
2459                        "Current node was not authorized to collate shard {}",
2460                        shard_id,
2461                    );
2462                    metrics::gauge!("tycho_node_in_current_vset").set(0);
2463                } else {
2464                    metrics::gauge!("tycho_node_in_current_vset").set(1);
2465                }
2466
2467                match active_collation_sessions_guard.entry(shard_id) {
2468                    hash_map::Entry::Occupied(entry) => {
2469                        let existing_session_info = entry.get().clone();
2470                        if local_pubkey.is_some() {
2471                            // start new session when seqno changed or subset changed for the same seqno
2472                            if existing_session_info.collators().short_hash == hash_short
2473                                && existing_session_info.seqno() == current_session_seqno
2474                            {
2475                                sessions_to_keep.push((shard_id, existing_session_info, block_ids));
2476                            } else {
2477                                to_finish_sessions.push(entry.remove());
2478                                sessions_to_start.push((shard_id, block_ids));
2479                            }
2480                        } else {
2481                            to_finish_sessions.push(entry.remove());
2482                            if let Some((_, collator)) = self.active_collators.remove(&shard_id) {
2483                                to_stop_collators.push((existing_session_info, collator));
2484                            }
2485                        }
2486                    }
2487                    hash_map::Entry::Vacant(_) => {
2488                        if local_pubkey.is_some() {
2489                            sessions_to_start.push((shard_id, block_ids));
2490                        }
2491                    }
2492                }
2493            }
2494
2495            // if we still have some active sessions that do not match with new shards and validator subset
2496            // then we need to finish them and stop their collators
2497            for shard_id in missed_shards_ids {
2498                if let Some(existing_session_info) =
2499                    active_collation_sessions_guard.remove(&shard_id)
2500                {
2501                    to_finish_sessions.push(existing_session_info.clone());
2502                    if let Some((_, collator)) = self.active_collators.remove(&shard_id) {
2503                        to_stop_collators.push((existing_session_info, collator));
2504                    }
2505                }
2506            }
2507        }
2508
2509        if !sessions_to_start.is_empty() {
2510            tracing::info!(
2511                target: tracing_targets::COLLATION_MANAGER,
2512                "Will start new collation sessions: {:?}",
2513                DebugIter(sessions_to_start.iter().map(|(s, _)| (s, current_session_seqno))),
2514            );
2515        }
2516
2517        // we may have sessions to finish, collators to stop, and sessions to start,
2518        // and we start missing collators for new sessions
2519        for (shard_id, prev_blocks_ids) in sessions_to_start {
2520            let (subset, hash_short) = get_validator_subset(shard_id)?;
2521
2522            let new_session_info = Arc::new(CollationSessionInfo::new(
2523                shard_id,
2524                current_session_seqno,
2525                ValidatorSubsetInfo {
2526                    validators: subset.values().cloned().collect(),
2527                    short_hash: hash_short,
2528                },
2529                Some(self.keypair.clone()),
2530            ));
2531
2532            tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
2533                "new_session_info: {:?}",
2534                new_session_info,
2535            );
2536
2537            let next_block_id_short = calc_next_block_id_short(&prev_blocks_ids);
2538
2539            match self.active_collators.entry(shard_id) {
2540                DashMapEntry::Occupied(_) => {
2541                    tracing::info!(
2542                        target: tracing_targets::COLLATION_MANAGER,
2543                        "Active collator exists for collation session {:?}. Will resume it",
2544                        new_session_info.id(),
2545                    );
2546                    sessions_to_keep.push((shard_id, new_session_info.clone(), prev_blocks_ids));
2547                }
2548                DashMapEntry::Vacant(entry) => {
2549                    tracing::info!(
2550                        target: tracing_targets::COLLATION_MANAGER,
2551                        "There is no active collator for collation session {:?}. Will start it",
2552                        new_session_info.id(),
2553                    );
2554
2555                    let cancel_collation_notify = Arc::new(Notify::new());
2556
2557                    match self
2558                        .collator_factory
2559                        .start(CollatorContext {
2560                            mq_adapter: self.mq_adapter.clone(),
2561                            mpool_adapter: self.mpool_adapter.clone(),
2562                            state_node_adapter: self.state_node_adapter.clone(),
2563                            config: self.config.clone(),
2564                            collation_session: new_session_info.clone(),
2565                            listener: self.dispatcher.clone(),
2566                            zerostate_id: *self.state_node_adapter.zerostate_id(),
2567                            shard_id,
2568                            prev_blocks_ids,
2569                            mc_data: mc_data.clone(),
2570                            mempool_config_override: self.mempool_config_override.clone(),
2571                            cancel_collation: cancel_collation_notify.clone(),
2572                        })
2573                        .await
2574                    {
2575                        Err(err) => {
2576                            tracing::error!(target: tracing_targets::COLLATION_MANAGER,
2577                                session_id = ?new_session_info.id(),
2578                                ?err,
2579                                "error starting collator"
2580                            );
2581                        }
2582                        Ok(collator) => {
2583                            entry.insert(ActiveCollator {
2584                                collator: Arc::new(collator),
2585                                state: CollatorState::Active,
2586                                cancel_collation: cancel_collation_notify,
2587                            });
2588                        }
2589                    }
2590                }
2591            }
2592
2593            // need to add validation session only for masterchain blocks, shard blocks are not being validated
2594            if shard_id.is_masterchain() {
2595                self.validator.add_session(AddSession {
2596                    shard_ident: shard_id,
2597                    session_id: new_session_info.get_validation_session_id(),
2598                    start_block_seqno: next_block_id_short.seqno,
2599                    validators: &new_session_info.collators().validators,
2600                })?;
2601            }
2602
2603            self.active_collation_sessions
2604                .write()
2605                .insert(shard_id, new_session_info);
2606        }
2607
2608        tracing::debug!(
2609            target: tracing_targets::COLLATION_MANAGER,
2610            "Will keep existing collation sessions: {:?}",
2611            DebugIter(sessions_to_keep.iter().map(|(_, s, _)| s.id())),
2612        );
2613
2614        // update master state in existing collators and resume collation
2615        for (shard_id, new_session_info, prev_blocks_ids) in sessions_to_keep {
2616            let collator = {
2617                let Some(mut active_collator) = self.active_collators.get_mut(&shard_id) else {
2618                    bail!(
2619                        "Collator for shard should exist for active session {:?}",
2620                        new_session_info.id(),
2621                    )
2622                };
2623                active_collator.state = CollatorState::Active;
2624                active_collator.collator.clone()
2625            };
2626
2627            tracing::debug!(
2628                target: tracing_targets::COLLATION_MANAGER,
2629                "Resuming collation attempts in shard session {:?}",
2630                new_session_info.id(),
2631            );
2632            collator
2633                .enqueue_resume_collation(
2634                    mc_data.clone(),
2635                    reset_collators,
2636                    new_session_info,
2637                    prev_blocks_ids,
2638                )
2639                .await?;
2640        }
2641
2642        if !to_finish_sessions.is_empty() {
2643            tracing::info!(
2644                target: tracing_targets::COLLATION_MANAGER,
2645                "Will finish outdated collation sessions: {:?}",
2646                DebugIter(to_finish_sessions.iter().map(|s| s.id())),
2647            );
2648        }
2649
2650        // enqueue outdated sessions finish tasks
2651        for session_info in to_finish_sessions {
2652            self.collation_sessions_to_finish
2653                .insert(session_info.id(), session_info.clone());
2654            self.finish_collation_session(session_info)?;
2655        }
2656
2657        if !to_stop_collators.is_empty() {
2658            tracing::info!(
2659                target: tracing_targets::COLLATION_MANAGER,
2660                "Will stop collators for sessions that we do not serve: {:?}",
2661                DebugIter(to_stop_collators.iter().map(|(s, _)| s.id())),
2662            );
2663        }
2664
2665        // enqueue dangling collators stop tasks
2666        for (session_info, active_collator) in to_stop_collators {
2667            let collator = active_collator.collator.clone();
2668            self.collators_to_stop
2669                .insert(session_info.id(), active_collator);
2670            collator.enqueue_stop().await?;
2671        }
2672
2673        Ok(())
2674
2675        // finally we will have initialized `active_collation_sessions`
2676        // and `active_collators` which run async block collations processes
2677    }
2678
2679    /// Execute collation session finalization routines
2680    pub fn finish_collation_session(
2681        &self,
2682        collation_session: Arc<CollationSessionInfo>,
2683    ) -> Result<()> {
2684        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
2685            "finish_collation_session: {:?}", collation_session.id(),
2686        );
2687        self.collation_sessions_to_finish
2688            .remove(&collation_session.id());
2689        Ok(())
2690    }
2691
2692    /// Remove stopped collator from cache
2693    pub fn handle_collator_stopped(&self, collation_session_id: CollationSessionId) -> Result<()> {
2694        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
2695            "handle_collator_stopped: {:?}", collation_session_id,
2696        );
2697        self.collators_to_stop.remove(&collation_session_id);
2698        Ok(())
2699    }
2700
2701    fn set_collator_state<F>(&self, shard_id: &ShardIdent, f: F) -> Option<CollatorState>
2702    where
2703        F: Fn(&mut ActiveCollator<Arc<CF::Collator>>),
2704    {
2705        match self.active_collators.get_mut(shard_id) {
2706            Some(mut active_collator) => {
2707                f(&mut active_collator);
2708                Some(active_collator.state)
2709            }
2710            None => None,
2711        }
2712    }
2713
2714    fn set_active_sync_info(&self, target_mc_block_seqno: BlockSeqno) -> Result<CancellationToken> {
2715        let mut guard = self.collation_sync_state.lock();
2716        if let Some(active_sync) = &guard.active_sync_to_applied {
2717            bail!(
2718                "previous sync_to_applied_mc_block should be finished \
2719                before: previous seqno={}, target seqno={}",
2720                active_sync.target_mc_block_seqno,
2721                target_mc_block_seqno,
2722            )
2723        }
2724
2725        let cancelled = CancellationToken::new();
2726        guard.active_sync_to_applied = Some(ActiveSync {
2727            target_mc_block_seqno,
2728            cancelled: cancelled.clone(),
2729        });
2730
2731        Ok(cancelled)
2732    }
2733
2734    fn clean_active_sync_info(&self) {
2735        let mut guard = self.collation_sync_state.lock();
2736        guard.active_sync_to_applied = None;
2737    }
2738
2739    fn update_last_received_mc_block_seqno(&self, received_block_id: &BlockId) {
2740        if !received_block_id.is_masterchain() {
2741            return;
2742        }
2743
2744        let mut guard = self.collation_sync_state.lock();
2745        guard.last_received_mc_block_seqno = Some(received_block_id.seqno);
2746    }
2747
2748    fn get_last_received_mc_block_seqno(&self) -> Option<BlockSeqno> {
2749        let guard = self.collation_sync_state.lock();
2750        guard.last_received_mc_block_seqno
2751    }
2752
2753    fn update_last_synced_to_mc_block_id(&self, mc_block_id: BlockId) {
2754        let mut guard = self.collation_sync_state.lock();
2755        guard.last_synced_to_mc_block_id = Some(mc_block_id);
2756    }
2757
2758    fn get_last_synced_to_mc_block_id(&self) -> Option<BlockId> {
2759        let guard = self.collation_sync_state.lock();
2760        guard.last_synced_to_mc_block_id
2761    }
2762
2763    /// Returns `true` if active sync was cancelled
2764    fn finish_active_sync_to_applied(&self, received_block_id: &BlockId) -> bool {
2765        // can finish active sync only if new master block received
2766        if !received_block_id.is_masterchain() {
2767            return false;
2768        }
2769
2770        let guard = self.collation_sync_state.lock();
2771
2772        // call to finish active sync if it exists and received block is newer
2773        if let Some(active_sync_info) = &guard.active_sync_to_applied {
2774            // call to finish active sync if new block is far ahead
2775            if received_block_id
2776                .seqno
2777                .saturating_sub(active_sync_info.target_mc_block_seqno)
2778                >= self.config.min_mc_block_delta_from_bc_to_sync
2779            {
2780                tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
2781                    prev_target_block_id = %BlockIdShort {
2782                        shard: ShardIdent::MASTERCHAIN,
2783                        seqno: active_sync_info.target_mc_block_seqno,
2784                    },
2785                    "cancel sync: will force finish previous sync \
2786                    to applied master block if not started to process state update",
2787                );
2788                active_sync_info.cancelled.cancel();
2789                return true;
2790            } else {
2791                // otherwise allow to handle newer block from bc when previous process started
2792                tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
2793                    prev_target_block_id = %BlockIdShort {
2794                        shard: ShardIdent::MASTERCHAIN,
2795                        seqno: active_sync_info.target_mc_block_seqno,
2796                    },
2797                    "cancel sync: will not force finish previous sync \
2798                    to applied master block, because new block is not far ahead",
2799                );
2800            }
2801        } else {
2802            tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
2803                "cancel sync: route_handle_block_from_bc: no active sync to cancel",
2804            );
2805        }
2806
2807        false
2808    }
2809
2810    /// Set master block latest chain time to calc next interval for master block collation.
2811    /// Prune all cached chain times for all shards upto current
2812    fn renew_mc_block_latest_chain_time(guard: &mut CollationSyncState, chain_time: u64) {
2813        if guard.mc_block_latest_chain_time < chain_time {
2814            guard.mc_block_latest_chain_time = chain_time;
2815        }
2816
2817        // prune
2818        for (_, collation_state) in guard.states.iter_mut() {
2819            collation_state
2820                .last_imported_anchor_events
2821                .retain(|it| it.ct > chain_time);
2822        }
2823    }
2824
2825    /// Reset collation status from `WaitForMasterStatus` to `AttemptsInProgress` for every shard.
2826    ///
2827    /// Use this method before resuming collation after sync to avoid ambiguous situations.
2828    /// If any shard has collation status `WaitForMasterStatus` and sync was executed,
2829    /// when master collation check was finished first then it will enqueue one more resume for shard,
2830    /// so we will have two parallel collations for shard that will cause panic further.
2831    fn reset_collation_sync_status(guard: &mut CollationSyncState) {
2832        for (_, collation_state) in guard.states.iter_mut() {
2833            if collation_state.status == CollationStatus::WaitForMasterStatus {
2834                collation_state.status = CollationStatus::AttemptsInProgress;
2835            }
2836        }
2837    }
2838
2839    /// 1. Store collation status for current shard
2840    /// 2. Detect the next step
2841    #[tracing::instrument(skip_all)]
2842    fn detect_next_collation_step(
2843        guard: &mut CollationSyncState,
2844        active_shards: Vec<ShardIdent>,
2845        shard_id: ShardIdent,
2846        ctx: DetectNextCollationStepContext,
2847    ) -> NextCollationStep {
2848        let DetectNextCollationStepContext {
2849            last_imported_anchor_ct,
2850            force_mc_block,
2851            mc_block_min_interval_ms,
2852            mc_block_max_interval_ms,
2853            collated_block_info,
2854        } = ctx;
2855
2856        // backward compatibility: if max interval is not defined take it equal to min interval
2857        let mc_block_max_interval_ms = if mc_block_max_interval_ms == 0 {
2858            mc_block_min_interval_ms
2859        } else {
2860            mc_block_max_interval_ms
2861        };
2862
2863        let _histogram = HistogramGuard::begin("detect_next_collation_step_time");
2864        assert!(
2865            active_shards.contains(&shard_id),
2866            "active_shards must include current shard"
2867        );
2868
2869        let mc_block_latest_chain_time = guard.mc_block_latest_chain_time;
2870
2871        // Check if masterchain collation state exists (already imported anchor events from MC shard)
2872        let mc_collation_state_exist = shard_id.is_masterchain()
2873            || guard
2874                .states
2875                .get(&ShardIdent::MASTERCHAIN)
2876                .is_some_and(|state| !state.last_imported_anchor_events.is_empty());
2877
2878        // Force collation for all if MC shard explicitly requires it (unprocessed messages)
2879        if shard_id.is_masterchain()
2880            && matches!(force_mc_block, ForceMasterCollation::ByUnprocessedMessages)
2881        {
2882            guard.mc_collation_forced_for_all = true;
2883        }
2884
2885        // Another forcing rule: no pending messages after shard blocks
2886        if matches!(
2887            force_mc_block,
2888            ForceMasterCollation::NoPendingMessagesAfterShardBlocks
2889        ) {
2890            guard.mc_forced_by_no_pending_msgs_on_ct = Some(last_imported_anchor_ct);
2891        }
2892
2893        let hard_forced_for_all = guard.mc_collation_forced_for_all;
2894        let mc_forced_by_no_pending_msgs_on_ct = guard.mc_forced_by_no_pending_msgs_on_ct;
2895
2896        // Determine if the current shard collation is explicitly forced in new anchor event
2897        let forced_in_current_shard = force_mc_block.is_forced();
2898
2899        // Add new anchor event for current shard
2900        tracing::trace!(
2901            target: tracing_targets::COLLATION_MANAGER,
2902            shard_id=?shard_id,
2903            last_imported_anchor_ct,
2904            forced_in_current_shard,
2905            ?collated_block_info,
2906            "anchor event appended"
2907        );
2908        let current_collation_state = guard.states.entry(shard_id).or_default();
2909        current_collation_state
2910            .last_imported_anchor_events
2911            .push(ImportedAnchorEvent {
2912                ct: last_imported_anchor_ct,
2913                mc_forced: forced_in_current_shard,
2914                collated_block_info,
2915            });
2916
2917        // Per-shard facts to simplify decision-making
2918        #[derive(Debug, Clone)]
2919        struct ShardFact {
2920            shard_id: ShardIdent,
2921            status: CollationStatus,   // current collation status for shard
2922            first_ct: Option<u64>,     // first ct
2923            mc_forced_ct: Option<u64>, // first ct on which mc block collation forced
2924            min_ct: Option<u64>,       // first ct >= min interval
2925            max_ct: Option<u64>,       // first ct >= max interval
2926            has_shard_block_with_externals: bool, // produced shard block with externals
2927        }
2928        impl ShardFact {
2929            fn with_status(shard_id: ShardIdent, status: CollationStatus) -> Self {
2930                Self {
2931                    shard_id,
2932                    status,
2933                    first_ct: None,
2934                    mc_forced_ct: None,
2935                    min_ct: None,
2936                    max_ct: None,
2937                    has_shard_block_with_externals: false,
2938                }
2939            }
2940            fn calc(
2941                shard_id: ShardIdent,
2942                state: &CollationState,
2943                mc_ct: u64,
2944                min_interval_ms: u64,
2945                max_interval_ms: u64,
2946            ) -> Self {
2947                let mut fact = Self::with_status(shard_id, state.status);
2948
2949                let mut last_known_collated_block_info: Option<CollatedBlockInfo> = None;
2950
2951                for s in &state.last_imported_anchor_events {
2952                    // check for the first shard block with externals
2953                    let mut is_first_shard_block_with_externals = false;
2954                    if let Some(curr_b_info) = s.collated_block_info {
2955                        // find first block after previous master
2956                        let is_first_after_prev_master = match &last_known_collated_block_info {
2957                            Some(last_b_info)
2958                                if last_b_info.prev_mc_block_seqno
2959                                    < curr_b_info.prev_mc_block_seqno =>
2960                            {
2961                                true
2962                            }
2963                            None => true,
2964                            _ => false,
2965                        };
2966
2967                        if !shard_id.is_masterchain() {
2968                            // if found then will seek for the first shard block with externals
2969                            if is_first_after_prev_master {
2970                                fact.has_shard_block_with_externals = false;
2971                            }
2972
2973                            // remember the first shard block with externals
2974                            is_first_shard_block_with_externals = curr_b_info
2975                                .has_processed_externals
2976                                && !fact.has_shard_block_with_externals;
2977                            if is_first_shard_block_with_externals {
2978                                fact.has_shard_block_with_externals = true;
2979                            }
2980                        }
2981
2982                        last_known_collated_block_info = Some(curr_b_info);
2983                    }
2984
2985                    if fact.first_ct.is_none() {
2986                        fact.first_ct = Some(s.ct);
2987                    }
2988
2989                    if s.mc_forced && fact.mc_forced_ct.is_none() {
2990                        fact.mc_forced_ct = Some(s.ct);
2991                    }
2992
2993                    // we take only first ct that exceed min interval
2994                    // or the next that goes with the first shard block with externals
2995                    if (fact.min_ct.is_none() || is_first_shard_block_with_externals)
2996                        && s.ct.saturating_sub(mc_ct) >= min_interval_ms
2997                    {
2998                        fact.min_ct = Some(s.ct);
2999                    }
3000
3001                    // we take only first ct that exceed max interval
3002                    if fact.max_ct.is_none() && s.ct.saturating_sub(mc_ct) >= max_interval_ms {
3003                        fact.max_ct = Some(s.ct);
3004                    }
3005                }
3006
3007                fact
3008            }
3009        }
3010
3011        // Collect facts for all active shards
3012        let mut facts = Vec::with_capacity(active_shards.len());
3013        for sid in &active_shards {
3014            if let Some(st) = guard.states.get(sid) {
3015                let f = ShardFact::calc(
3016                    *sid,
3017                    st,
3018                    mc_block_latest_chain_time,
3019                    mc_block_min_interval_ms,
3020                    mc_block_max_interval_ms,
3021                );
3022                facts.push(f);
3023            } else {
3024                facts.push(ShardFact::with_status(
3025                    *sid,
3026                    CollationStatus::AttemptsInProgress,
3027                ));
3028            }
3029        }
3030
3031        tracing::debug!(
3032            target: tracing_targets::COLLATION_MANAGER,
3033            mc_block_latest_chain_time,
3034            mc_block_min_interval_ms,
3035            mc_block_max_interval_ms,
3036            hard_forced_for_all,
3037            mc_forced_by_no_pending_msgs_on_ct,
3038            ?facts,
3039            "calculated shard facts"
3040        );
3041
3042        let any_has_shard_block_with_externals =
3043            facts.iter().any(|f| f.has_shard_block_with_externals);
3044
3045        fn choose_candidate(
3046            curr_sid: &ShardIdent,
3047            f: &ShardFact,
3048            mc_forced_by_shard_on_ct: Option<u64>,
3049            any_has_shard_block_with_externals: bool,
3050            hard_forced_for_all: bool,
3051        ) -> Option<u64> {
3052            // take into account shard if it is current
3053            // or it is ready to collate or waiting
3054            let ready_to_detect_next_step = f.shard_id == *curr_sid
3055                || matches!(
3056                    f.status,
3057                    CollationStatus::ReadyToCollateMaster
3058                        | CollationStatus::WaitForMasterStatus
3059                        | CollationStatus::WaitForShardStatus
3060                );
3061
3062            // chain time on which master was forced
3063            f.mc_forced_ct
3064                // first ct if master was forced for all
3065                .or(if hard_forced_for_all {
3066                    f.first_ct
3067                } else {
3068                    None
3069                })
3070                .or(
3071                    // chain time that exceed min interval or when was forced by shard
3072                    // when any shard has collated shard block with externals
3073                    // or any shard has forced master block (e.g. by no more pending messages)
3074                    // if shard is ready to detect next step
3075                    if any_has_shard_block_with_externals || mc_forced_by_shard_on_ct.is_some() {
3076                        if ready_to_detect_next_step {
3077                            f.min_ct.map(|min_ct| {
3078                                mc_forced_by_shard_on_ct
3079                                    .map_or(min_ct, |forced_ct| min_ct.max(forced_ct))
3080                            })
3081                        } else {
3082                            None
3083                        }
3084                    }
3085                    // finally take chain time that exceed max interval
3086                    else {
3087                        f.max_ct
3088                    },
3089                )
3090        }
3091
3092        // get next mc block ct candidates from all shards
3093        // and detect if should collate by current shard
3094        let mut should_collate_by_current_shard = false;
3095
3096        let candidates: Vec<_> = facts
3097            .iter()
3098            .map(|f| {
3099                let ct = choose_candidate(
3100                    &shard_id,
3101                    f,
3102                    mc_forced_by_no_pending_msgs_on_ct,
3103                    any_has_shard_block_with_externals,
3104                    hard_forced_for_all,
3105                );
3106                if f.shard_id == shard_id && ct.is_some() {
3107                    should_collate_by_current_shard = true;
3108                }
3109                ct
3110            })
3111            .collect();
3112
3113        // check if should collate by every shard
3114        let should_collate_by_every_shard =
3115            !candidates.is_empty() && candidates.iter().all(|c| c.is_some());
3116
3117        tracing::debug!(
3118            target: tracing_targets::COLLATION_MANAGER,
3119            shard_id=?shard_id,
3120            ?candidates,
3121            should_collate_by_current_shard,
3122            should_collate_by_every_shard,
3123            any_has_shard_block_with_externals,
3124            hard_forced_for_all,
3125            mc_forced_by_no_pending_msgs_on_ct,
3126            mc_block_min_interval_ms,
3127            mc_block_max_interval_ms,
3128            "candidates collected"
3129        );
3130
3131        // If not all shards ready to collate next master then resume attempts
3132        if !should_collate_by_every_shard {
3133            let mut shards_to_resume_attempts = vec![];
3134
3135            // check if other shards ready to collate
3136            let all_other_shards_ready_to_collate = guard.states.iter().all(|(sid, s)| {
3137                *sid == shard_id || s.status == CollationStatus::ReadyToCollateMaster
3138            });
3139
3140            // If current shard is not ready to collate master then try to resume attempts
3141            let current_collation_state = guard.states.entry(shard_id).or_default();
3142            if !should_collate_by_current_shard {
3143                if !mc_collation_state_exist {
3144                    // wait for master collation status if it not exist yet
3145                    current_collation_state.status = CollationStatus::WaitForMasterStatus;
3146                } else if shard_id.is_masterchain() && !all_other_shards_ready_to_collate {
3147                    // master always wait for next shard event if any is not ready to collate
3148                    // not to wait for one more anchor if shard forces master collation
3149                    current_collation_state.status = CollationStatus::WaitForShardStatus;
3150                } else {
3151                    // or resume attempts right now
3152                    current_collation_state.status = CollationStatus::AttemptsInProgress;
3153                    shards_to_resume_attempts.push(shard_id);
3154                }
3155            } else {
3156                // otherwise it is ready to collate master
3157                current_collation_state.status = CollationStatus::ReadyToCollateMaster;
3158            };
3159
3160            for (sid, collation_state) in guard.states.iter_mut() {
3161                // master resumes all waiting shards
3162                if (shard_id.is_masterchain()
3163                            && collation_state.status == CollationStatus::WaitForMasterStatus)
3164                            // shard resumes waiting master
3165                            || (!shard_id.is_masterchain()
3166                            && collation_state.status == CollationStatus::WaitForShardStatus)
3167                {
3168                    collation_state.status = CollationStatus::AttemptsInProgress;
3169                    shards_to_resume_attempts.push(*sid);
3170                }
3171            }
3172
3173            let res = if !shards_to_resume_attempts.is_empty() {
3174                NextCollationStep::ResumeAttemptsIn(shards_to_resume_attempts)
3175            } else if shard_id.is_masterchain() {
3176                NextCollationStep::WaitForShardStatus
3177            } else {
3178                NextCollationStep::WaitForMasterStatus
3179            };
3180
3181            tracing::info!(
3182                target: tracing_targets::COLLATION_MANAGER,
3183                shard_id=?shard_id,
3184                ?candidates,
3185                should_collate_by_current_shard,
3186                should_collate_by_every_shard,
3187                any_has_shard_block_with_externals,
3188                hard_forced_for_all,
3189                mc_forced_by_no_pending_msgs_on_ct,
3190                mc_collation_state_exist,
3191                mc_block_min_interval_ms,
3192                mc_block_max_interval_ms,
3193                decision=?res,
3194                "step decision"
3195            );
3196
3197            return res;
3198        }
3199
3200        // Otherwise: collate MC block using max candidate ct
3201        let next_mc_block_chain_time = candidates.into_iter().flatten().max().unwrap();
3202
3203        // Mark all shards as "running" (attempts in progress)
3204        for (sid, st) in guard.states.iter_mut() {
3205            st.status = CollationStatus::AttemptsInProgress;
3206
3207            // we may use not last imported chain time to collate master
3208            // so we prune all cached chain times for master above next chain time
3209            // so next anchors will be imported again
3210            if sid.is_masterchain() {
3211                st.last_imported_anchor_events
3212                    .retain(|s| s.ct <= next_mc_block_chain_time);
3213            }
3214        }
3215
3216        // Update MC block time and reset force flags
3217        Self::renew_mc_block_latest_chain_time(guard, next_mc_block_chain_time);
3218
3219        // drop "forced for all" and "forced by no pending messages" flags
3220        // if we decided to collate master
3221        guard.mc_collation_forced_for_all = false;
3222        guard.mc_forced_by_no_pending_msgs_on_ct = None;
3223
3224        let res = NextCollationStep::CollateMaster(next_mc_block_chain_time);
3225
3226        tracing::info!(
3227            target: tracing_targets::COLLATION_MANAGER,
3228            shard_id=?shard_id,
3229            should_collate_by_current_shard,
3230            should_collate_by_every_shard,
3231            any_has_shard_block_with_externals,
3232            hard_forced_for_all,
3233            mc_forced_by_no_pending_msgs_on_ct,
3234            mc_collation_state_exist,
3235            mc_block_min_interval_ms,
3236            mc_block_max_interval_ms,
3237            decision=?res,
3238            "step decision"
3239        );
3240
3241        res
3242    }
3243
3244    /// Enqueue master block collation task. Will determine top shard blocks for this collation
3245    async fn enqueue_mc_block_collation(
3246        &self,
3247        next_mc_block_id_short: BlockIdShort,
3248        next_mc_block_chain_time: u64,
3249        _trigger_block_id_opt: Option<BlockId>,
3250    ) -> Result<()> {
3251        let _histogram = HistogramGuard::begin("tycho_collator_enqueue_mc_block_collation_time");
3252
3253        // get masterchain collator if exists
3254        let Some(mc_collator) = self
3255            .active_collators
3256            .get(&ShardIdent::MASTERCHAIN)
3257            .map(|r| r.collator.clone())
3258        else {
3259            bail!("Masterchain collator is not started yet!");
3260        };
3261
3262        // TODO: How to choose top shard blocks for master block collation when they are collated async and in parallel?
3263        //      We know the last anchor (An) used in shard (ShA) block that causes master block collation,
3264        //      so we search for block from other shard (ShB) that includes the same anchor (An).
3265        //      Or the first from previouses (An-x) that includes externals for that shard (ShB)
3266        //      if all next including required one ([An-x+1, An]) do not contain externals for shard (ShB).
3267
3268        let top_shard_blocks_info = self
3269            .blocks_cache
3270            .get_top_shard_blocks_info_for_mc_block(next_mc_block_id_short)?;
3271
3272        mc_collator
3273            .enqueue_do_collate(top_shard_blocks_info, next_mc_block_chain_time)
3274            .await?;
3275
3276        tracing::info!(target: tracing_targets::COLLATION_MANAGER,
3277            "Master block collation enqueued: (block_id={} ct={})",
3278            next_mc_block_id_short,
3279            next_mc_block_chain_time,
3280        );
3281
3282        Ok(())
3283    }
3284
3285    async fn enqueue_try_collate(&self, shard_id: &ShardIdent) -> Result<()> {
3286        // get collator if exists
3287        let Some(collator) = self
3288            .active_collators
3289            .get(shard_id)
3290            .map(|r| r.collator.clone())
3291        else {
3292            tracing::warn!(
3293                target: tracing_targets::COLLATION_MANAGER,
3294                "Node does not collate blocks for shard {}",
3295                shard_id,
3296            );
3297            return Ok(());
3298        };
3299
3300        collator.enqueue_try_collate().await?;
3301
3302        tracing::info!(
3303            target: tracing_targets::COLLATION_MANAGER,
3304            "Enqueued next attempt to collate block for shard {}",
3305            shard_id,
3306        );
3307
3308        Ok(())
3309    }
3310
3311    /// Process validated block
3312    /// 1. Process invalid block (currently, just panic)
3313    /// 2. Update block in cache with validation info
3314    /// 2. Execute processing for master or shard block
3315    #[tracing::instrument(skip_all, fields(block_id = %block_id.as_short_id()))]
3316    pub async fn handle_validated_master_block(
3317        &self,
3318        block_id: BlockId,
3319        status: ValidationStatus,
3320    ) -> Result<()> {
3321        tracing::debug!(
3322            target: tracing_targets::COLLATION_MANAGER,
3323            is_complete = matches!(&status, ValidationStatus::Complete(_)),
3324            "Start processing block validation result...",
3325        );
3326
3327        let _histogram = HistogramGuard::begin("tycho_collator_handle_validated_master_block_time");
3328
3329        // update block validation status
3330        let updated = self
3331            .blocks_cache
3332            .store_master_block_validation_result(&block_id, status);
3333        if !updated {
3334            return Ok(());
3335        }
3336
3337        self.ready_to_sync.notified().await;
3338
3339        // process valid block
3340        self.commit_valid_master_block(&block_id).await?;
3341
3342        self.ready_to_sync.notify_one();
3343
3344        Ok(())
3345    }
3346
3347    /// Try to commit validated and valid master block
3348    /// if it was not already committed before
3349    /// 1. Check if master block is valid
3350    /// 2. Extract master block subgraph with shard blocks
3351    /// 3. Send to sync
3352    /// 4. Commit queue diff
3353    /// 5. Clean up from cache
3354    /// 6. Process delayed master state update if exists
3355    /// 7. Notify top processed anchor to mempool if block commited by received from bc
3356    async fn commit_valid_master_block(&self, mc_block_id: &BlockId) -> Result<()> {
3357        tracing::debug!(
3358            target: tracing_targets::COLLATION_MANAGER,
3359            "Start to commit validated and valid master block ({})...",
3360            mc_block_id.as_short_id(),
3361        );
3362
3363        // gc blocks from cache when commit finished
3364        scopeguard::defer!(self.blocks_cache.gc_prev_blocks());
3365
3366        // we can process delayed master state update now
3367        let mut top_processed_to_anchor_to_notify = self
3368            .notify_to_mempool_and_process_delayed_mc_state_update(mc_block_id)
3369            .await?;
3370
3371        let histogram = HistogramGuard::begin("tycho_collator_commit_valid_master_block_time");
3372        let histogram_extract =
3373            HistogramGuard::begin("tycho_collator_extract_master_block_subgraph_time");
3374        let mut extract_elapsed = Default::default();
3375        let mut sync_elapsed = Default::default();
3376
3377        // extract master block with all shard blocks if valid, and process them
3378        match self
3379            .blocks_cache
3380            .extract_mc_block_subgraph_for_sync(mc_block_id)
3381        {
3382            McBlockSubgraphExtract::Extracted(subgraph) => {
3383                extract_elapsed = histogram_extract.finish();
3384
3385                let partitions = subgraph.get_partitions();
3386
3387                let McBlockSubgraph {
3388                    master_block,
3389                    shard_blocks,
3390                } = subgraph;
3391
3392                // we can gc upto to current master block after commit
3393                // because we do not need to commit all previous blocks
3394                // because all previous blocks are already in bc state
3395                // and all previous diffs already committed by current one
3396                let to_blocks_keys = master_block.get_top_blocks_keys()?;
3397                self.blocks_cache.set_gc_to_boundary(&to_blocks_keys);
3398
3399                // send to sync only if was not received from bc
3400                if matches!(&master_block.data, BlockCacheEntryData::Collated {
3401                    received_after_collation: false,
3402                    ..
3403                }) {
3404                    let histogram =
3405                        HistogramGuard::begin("tycho_collator_send_blocks_to_sync_time");
3406
3407                    self.send_block_to_sync(master_block.data)?;
3408
3409                    for shard_block in shard_blocks {
3410                        self.send_block_to_sync(shard_block.data)?;
3411                    }
3412
3413                    sync_elapsed = histogram.finish();
3414                    tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
3415                        total = sync_elapsed.as_millis(),
3416                        "send_blocks_to_sync timings",
3417                    );
3418
3419                    // if current master block was not applied to bc state yet
3420                    // then we should not notify top processed to anchor to mempool now
3421                    // because we are not sure that block will be applied
3422                    // and should wait until block_accepted event is received
3423                    top_processed_to_anchor_to_notify = None;
3424                } else {
3425                    // if current block was committed by received one
3426                    // then `on_block_accepted` will not be called further
3427                    // so we need to notify `top_processed_to_anchor` to mempool here
3428                    top_processed_to_anchor_to_notify = master_block.top_processed_to_anchor;
3429                }
3430
3431                let _histogram =
3432                    HistogramGuard::begin("tycho_collator_send_blocks_to_sync_commit_diffs_time");
3433
3434                Self::commit_block_queue_diff(
3435                    self.mq_adapter.clone(),
3436                    &master_block.block_id,
3437                    &master_block.top_shard_blocks_info,
3438                    &partitions,
3439                )?;
3440            }
3441            McBlockSubgraphExtract::AlreadyExtracted => {
3442                tracing::debug!(
3443                    target: tracing_targets::COLLATION_MANAGER,
3444                    "Master block subgraph is already extracted and cleaned up from cache ({}). Do nothing",
3445                    mc_block_id.as_short_id(),
3446                );
3447            }
3448        }
3449
3450        tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
3451            total = histogram.finish().as_millis(),
3452            extract_subgraph = extract_elapsed.as_millis(),
3453            sync = sync_elapsed.as_millis(),
3454            "commit_valid_master_block timings",
3455        );
3456
3457        // report last processed anchor to mempool
3458        if let Some(top_processed_to_anchor) = top_processed_to_anchor_to_notify {
3459            self.notify_top_processed_to_anchor_to_mempool(
3460                mc_block_id.seqno,
3461                top_processed_to_anchor,
3462            )
3463            .await?;
3464        }
3465
3466        Ok(())
3467    }
3468
3469    fn send_block_to_sync(&self, data: BlockCacheEntryData) -> Result<()> {
3470        let candidate_stuff = match data {
3471            BlockCacheEntryData::Collated {
3472                candidate_stuff,
3473                status,
3474                received_after_collation: false,
3475                ..
3476            } if status != CandidateStatus::Synced => candidate_stuff,
3477            _ => return Ok(()),
3478        };
3479
3480        let block_id = *candidate_stuff.candidate.block.id();
3481        self.state_node_adapter
3482            .accept_block(candidate_stuff.into_block_for_sync())?;
3483        tracing::debug!(
3484            target: tracing_targets::COLLATION_MANAGER,
3485            "Block was successfully sent to sync ({})",
3486            block_id,
3487        );
3488        Ok(())
3489    }
3490
3491    /// Collect top blocks seqno from all shards by master block id. Master block seqno included.
3492    /// Returns None when unable to read related top shard blocks info.
3493    async fn get_top_blocks_seqno(
3494        mc_block_id: &BlockId,
3495        blocks_cache: &BlocksCache,
3496        state_node_adapter: Arc<dyn StateNodeAdapter>,
3497    ) -> Result<Option<FastHashMap<ShardIdent, BlockSeqno>>> {
3498        let mut result = FastHashMap::default();
3499
3500        result.insert(mc_block_id.shard, mc_block_id.seqno);
3501
3502        // try get top shard blocks from cache first
3503        if let Some(top_shard_blocks) = blocks_cache.get_top_shard_blocks(mc_block_id.as_short_id())
3504        {
3505            result.extend(top_shard_blocks);
3506            return Ok(Some(result));
3507        }
3508
3509        // then try read from state
3510        match state_node_adapter
3511            .load_state(mc_block_id.seqno, mc_block_id, Default::default())
3512            .await
3513        {
3514            Err(err) => match err.downcast_ref::<StateNotFound>() {
3515                Some(_) => {
3516                    tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
3517                        %mc_block_id,
3518                        "master state not found in get_top_blocks_seqno",
3519                    );
3520                }
3521                _ => {
3522                    tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
3523                        ?err,
3524                        "error loading master state in get_top_blocks_seqno",
3525                    );
3526                }
3527            },
3528            Ok(state) => {
3529                for item in state.shards()?.latest_blocks() {
3530                    let top_sb = item?;
3531                    result.insert(top_sb.shard, top_sb.seqno);
3532                }
3533                return Ok(Some(result));
3534            }
3535        }
3536
3537        // finally try read from block data
3538        match state_node_adapter.load_block(mc_block_id).await {
3539            Err(err) => {
3540                tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
3541                    %mc_block_id,
3542                    ?err,
3543                    "error loading master block in get_top_blocks_seqno",
3544                );
3545            }
3546            Ok(None) => {
3547                tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
3548                    %mc_block_id,
3549                    "master block was not found in get_top_blocks_seqno",
3550                );
3551            }
3552            Ok(Some(mc_block_stuff)) => {
3553                let top_blocks = TopBlocks::from_mc_block(&mc_block_stuff)?;
3554                for top_sb in top_blocks.shard_heights().iter() {
3555                    result.insert(top_sb.shard, top_sb.seqno);
3556                }
3557                return Ok(Some(result));
3558            }
3559        }
3560
3561        // unable to load related shard blocks info, return None
3562        Ok(None)
3563    }
3564}
3565
3566#[derive(Debug)]
3567enum ProcessMcStateUpdateMode {
3568    StartCollation { reset_collators: bool },
3569    SkipCollation,
3570}
3571
3572#[derive(Default)]
3573struct RestoreQueueResult {
3574    last_mc_state: Option<ShardStateStuff>,
3575    prev_mc_state: Option<ShardStateStuff>,
3576    prev_mc_block_id: Option<BlockId>,
3577    synced_to_blocks_keys: Vec<BlockCacheKey>,
3578    applied_diffs_ids: FastHashSet<BlockId>,
3579}
3580
3581// TODO: Move into `tycho_types`.
3582trait GlobalCapabilitiesExt {
3583    /// Checks whether this capabilities set is fully
3584    /// included into `other` (comparing raw bits to include unnamed variants).
3585    fn is_subset_of(&self, other: GlobalCapabilities) -> bool;
3586}
3587
3588impl GlobalCapabilitiesExt for GlobalCapabilities {
3589    fn is_subset_of(&self, other: GlobalCapabilities) -> bool {
3590        let this = self.into_inner();
3591        let other = other.into_inner();
3592        this & !other == 0
3593    }
3594}
3595
3596#[derive(Debug)]
3597struct DetectNextCollationStepContext {
3598    pub last_imported_anchor_ct: u64,
3599    pub force_mc_block: ForceMasterCollation,
3600    pub mc_block_min_interval_ms: u64,
3601    pub mc_block_max_interval_ms: u64,
3602    pub collated_block_info: Option<CollatedBlockInfo>,
3603}
3604
3605impl DetectNextCollationStepContext {
3606    pub fn new(
3607        last_imported_anchor_ct: u64,
3608        force_mc_block: ForceMasterCollation,
3609        mc_block_min_interval_ms: u64,
3610        mc_block_max_interval_ms: u64,
3611        collated_block_info: Option<CollatedBlockInfo>,
3612    ) -> Self {
3613        Self {
3614            last_imported_anchor_ct,
3615            force_mc_block,
3616            mc_block_min_interval_ms,
3617            mc_block_max_interval_ms,
3618            collated_block_info,
3619        }
3620    }
3621}