Skip to main content

tycho_collator/collator/
mod.rs

1use std::pin::Pin;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anchors_cache::{AnchorInfo, AnchorsCache};
6use anyhow::{Context, Result};
7use async_trait::async_trait;
8use error::CollatorError;
9use futures_util::future::Future;
10use messages_reader::{MessagesReader, MessagesReaderContext};
11use tokio::sync::{Notify, oneshot};
12use tokio_util::sync::CancellationToken;
13use tracing::Instrument;
14use tycho_block_util::block::calc_next_block_id_short;
15use tycho_block_util::state::{ShardStateStuff, choose_genesis_info};
16use tycho_core::global_config::{MempoolGlobalConfig, ZerostateId};
17use tycho_core::storage::LoadStateHint;
18use tycho_network::PeerId;
19use tycho_types::models::*;
20use tycho_types::prelude::*;
21use tycho_util::futures::JoinTask;
22use tycho_util::mem::Reclaimer;
23use tycho_util::metrics::{HistogramGuard, HistogramGuardWithLabels};
24use tycho_util::time::now_millis;
25use types::MsgsExecutionParamsStuff;
26
27use self::types::{BlockSerializerCache, CollatorStats, PrevData, WorkingState};
28use crate::internal_queue::types::message::EnqueuedMessage;
29use crate::mempool::{GetAnchorResult, MempoolAdapter, MempoolAnchorId};
30use crate::queue_adapter::MessageQueueAdapter;
31use crate::state_node::StateNodeAdapter;
32use crate::types::processed_upto::ProcessedUptoInfoExtension;
33use crate::types::{
34    BlockCollationResult, CollationSessionId, CollationSessionInfo, CollatorConfig, DebugDisplay,
35    DisplayBlockIdsIntoIter, McData, TopBlockDescription,
36};
37use crate::utils::async_queued_dispatcher::{
38    AsyncQueuedDispatcher, STANDARD_QUEUED_DISPATCHER_BUFFER_SIZE,
39};
40use crate::{method_to_queued_async_closure, tracing_targets};
41
42mod anchors_cache;
43mod debug_info;
44mod do_collate;
45mod error;
46mod execution_manager;
47mod messages_buffer;
48mod messages_reader;
49mod types;
50
51#[cfg(any(test, feature = "bench-helpers"))]
52mod test_utils;
53
54#[cfg(feature = "bench-helpers")]
55pub mod bench_export {
56    pub use super::messages_buffer::{IncludeAllMessages, MessageGroup, MessagesBuffer};
57    pub use super::test_utils::make_stub_internal_parsed_message;
58    pub use super::types::ParsedMessage;
59}
60
61pub use do_collate::{is_first_block_after_prev_master, work_units};
62pub use error::CollationCancelReason;
63use messages_reader::state::ReaderState;
64pub use types::{ForceMasterCollation, ShardDescriptionExt};
65
66mod state;
67mod statistics;
68#[cfg(test)]
69#[path = "tests/collator_tests.rs"]
70pub(super) mod tests;
71
72#[cfg(test)]
73pub(crate) use messages_reader::tests::{TestInternalMessage, TestMessageFactory};
74
75use crate::collator::anchors_cache::AnchorsCacheTransaction;
76// FACTORY
77
78pub struct CollatorContext {
79    pub mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
80    pub mpool_adapter: Arc<dyn MempoolAdapter>,
81    pub state_node_adapter: Arc<dyn StateNodeAdapter>,
82    pub config: Arc<CollatorConfig>,
83    pub collation_session: Arc<CollationSessionInfo>,
84    pub listener: Arc<dyn CollatorEventListener>,
85    pub shard_id: ShardIdent,
86    pub prev_blocks_ids: Vec<BlockId>,
87    pub mc_data: Arc<McData>,
88    pub mempool_config_override: Option<MempoolGlobalConfig>,
89
90    /// For graceful collation cancellation
91    pub cancel_collation: Arc<Notify>,
92    pub zerostate_id: ZerostateId,
93}
94
95#[async_trait]
96pub trait CollatorFactory: Send + Sync + 'static {
97    type Collator: Collator;
98
99    async fn start(&self, cx: CollatorContext) -> Result<Self::Collator>;
100}
101
102#[async_trait]
103impl<F, FT, R> CollatorFactory for F
104where
105    F: Fn(CollatorContext) -> FT + Send + Sync + 'static,
106    FT: Future<Output = Result<R>> + Send + 'static,
107    R: Collator,
108{
109    type Collator = R;
110
111    async fn start(&self, cx: CollatorContext) -> Result<Self::Collator> {
112        self(cx).await
113    }
114}
115
116// EVENTS LISTENER
117
118#[async_trait]
119pub trait CollatorEventListener: Send + Sync {
120    /// Process block collation skip by any reason
121    async fn on_skipped(
122        &self,
123        prev_mc_block_id: BlockId,
124        next_block_id_short: BlockIdShort,
125        anchor_chain_time: u64,
126        force_mc_block: ForceMasterCollation,
127        collation_config: Arc<CollationConfig>,
128    ) -> Result<()>;
129    /// Handle when collator action was cancelled
130    async fn on_cancelled(
131        &self,
132        prev_mc_block_id: BlockId,
133        next_block_id_short: BlockIdShort,
134        cancel_reason: CollationCancelReason,
135    ) -> Result<()>;
136    /// Process new collated shard or master block
137    async fn on_block_candidate(&self, collation_result: BlockCollationResult) -> Result<()>;
138    /// Process collator stopped event
139    async fn on_collator_stopped(&self, collation_session_id: CollationSessionId) -> Result<()>;
140}
141
142// COLLATOR
143
144#[async_trait]
145pub trait Collator: Send + Sync + 'static {
146    /// Enqueue collator stop task
147    async fn enqueue_stop(&self) -> Result<()>;
148    /// Enqueue update `McData` if newer, reset `PrevData` and run next collation attempt
149    async fn enqueue_resume_collation(
150        &self,
151        mc_data: Arc<McData>,
152        reset: bool,
153        collation_session: Arc<CollationSessionInfo>,
154        prev_blocks_ids: Vec<BlockId>,
155    ) -> Result<()>;
156    /// Enqueue next attemt to collate block
157    /// (with check if there are internals or externals for collation).
158    /// Check implementation for master and shards for details
159    async fn enqueue_try_collate(&self) -> Result<()>;
160    /// Enqueue new block collation (without check of internals and externals)
161    async fn enqueue_do_collate(
162        &self,
163        top_shard_blocks_info: Vec<TopBlockDescription>,
164        next_chain_time: u64,
165    ) -> Result<()>;
166}
167
168pub struct CollatorStdImplFactory {
169    pub wu_tuner_event_sender: Option<tokio::sync::mpsc::Sender<work_units::WuEvent>>,
170}
171
172#[async_trait]
173impl CollatorFactory for CollatorStdImplFactory {
174    type Collator = AsyncQueuedDispatcher<CollatorStdImpl>;
175
176    async fn start(&self, cx: CollatorContext) -> Result<Self::Collator> {
177        CollatorStdImpl::start(
178            cx.mq_adapter,
179            cx.mpool_adapter,
180            cx.state_node_adapter,
181            cx.config,
182            cx.collation_session,
183            cx.listener,
184            cx.zerostate_id,
185            cx.shard_id,
186            cx.prev_blocks_ids,
187            cx.mc_data,
188            cx.mempool_config_override,
189            cx.cancel_collation,
190            self.wu_tuner_event_sender.clone(),
191        )
192        .await
193    }
194}
195
196#[async_trait]
197impl Collator for AsyncQueuedDispatcher<CollatorStdImpl> {
198    async fn enqueue_stop(&self) -> Result<()> {
199        let cancel_token = self.cancel_token().clone();
200        self.enqueue_task(method_to_queued_async_closure!(stop_collator, cancel_token))
201            .await
202    }
203
204    /// Enqueue update `McData` if newer, reset `PrevData` if required and run next collation attempt
205    async fn enqueue_resume_collation(
206        &self,
207        mc_data: Arc<McData>,
208        reset: bool,
209        collation_session: Arc<CollationSessionInfo>,
210        prev_blocks_ids: Vec<BlockId>,
211    ) -> Result<()> {
212        self.enqueue_task(method_to_queued_async_closure!(
213            resume_collation_wrapper,
214            mc_data,
215            reset,
216            collation_session,
217            prev_blocks_ids
218        ))
219        .await
220    }
221
222    async fn enqueue_try_collate(&self) -> Result<()> {
223        self.enqueue_task(method_to_queued_async_closure!(
224            wait_state_and_try_collate_wrapper,
225        ))
226        .await
227    }
228
229    async fn enqueue_do_collate(
230        &self,
231        top_shard_blocks_info: Vec<TopBlockDescription>,
232        next_chain_time: u64,
233    ) -> Result<()> {
234        self.enqueue_task(method_to_queued_async_closure!(
235            wait_state_and_do_collate_wrapper,
236            top_shard_blocks_info,
237            next_chain_time
238        ))
239        .await
240    }
241}
242
243pub struct CollatorStdImpl {
244    next_block_info: BlockIdShort,
245
246    config: Arc<CollatorConfig>,
247    collation_session: Arc<CollationSessionInfo>,
248
249    listener: Arc<dyn CollatorEventListener>,
250    mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
251    mpool_adapter: Arc<dyn MempoolAdapter>,
252    state_node_adapter: Arc<dyn StateNodeAdapter>,
253    shard_id: ShardIdent,
254
255    delayed_working_state: DelayedWorkingState,
256
257    background_store_new_state_tx:
258        tokio::sync::mpsc::UnboundedSender<JoinTask<Result<ShardStateStuff>>>,
259
260    anchors_cache: AnchorsCache,
261    block_serializer_cache: BlockSerializerCache,
262    stats: CollatorStats,
263    timer: std::time::Instant,
264    anchor_timer: std::time::Instant,
265    shard_blocks_count_from_last_anchor: u16,
266
267    /// Mempool config override from the Global config.
268    /// Will exist on the next restarts because the config file remains,
269    /// but it will be outdated.
270    mempool_config_override: Option<MempoolGlobalConfig>,
271
272    /// For graceful collation cancellation
273    cancel_collation: Arc<Notify>,
274
275    /// Events sender for Work Units tuner service
276    wu_tuner_event_sender: Option<tokio::sync::mpsc::Sender<work_units::WuEvent>>,
277    zerostate_id: ZerostateId,
278}
279
280impl CollatorStdImpl {
281    #[allow(clippy::too_many_arguments)]
282    pub async fn start(
283        mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
284        mpool_adapter: Arc<dyn MempoolAdapter>,
285        state_node_adapter: Arc<dyn StateNodeAdapter>,
286        config: Arc<CollatorConfig>,
287        collation_session: Arc<CollationSessionInfo>,
288        listener: Arc<dyn CollatorEventListener>,
289        zerostate_id: ZerostateId,
290        shard_id: ShardIdent,
291        prev_blocks_ids: Vec<BlockId>,
292        mc_data: Arc<McData>,
293        mempool_config_override: Option<MempoolGlobalConfig>,
294        cancel_collation: Arc<Notify>,
295        wu_tuner_event_sender: Option<tokio::sync::mpsc::Sender<work_units::WuEvent>>,
296    ) -> Result<AsyncQueuedDispatcher<Self>> {
297        const BLOCK_CELL_COUNT_BASELINE: usize = 100_000;
298
299        let next_block_info = calc_next_block_id_short(&prev_blocks_ids);
300
301        tracing::info!(target: tracing_targets::COLLATOR,
302            "(next_block_id={}): collator starting...", next_block_info,
303        );
304
305        let (working_state_tx, working_state_rx) = oneshot::channel::<Result<Box<WorkingState>>>();
306
307        let (background_store_new_state_tx, mut background_store_new_state_rx) =
308            tokio::sync::mpsc::unbounded_channel();
309
310        let processor = Self {
311            next_block_info,
312            config,
313            collation_session,
314            listener,
315            mq_adapter,
316            mpool_adapter,
317            state_node_adapter,
318            shard_id,
319            delayed_working_state: DelayedWorkingState::new(shard_id, async move {
320                match working_state_rx.await {
321                    Ok(state) => state,
322                    Err(_) => anyhow::bail!("collator init cancelled"),
323                }
324            }),
325            background_store_new_state_tx,
326            anchors_cache: Default::default(),
327            block_serializer_cache: BlockSerializerCache::with_capacity(BLOCK_CELL_COUNT_BASELINE),
328            stats: Default::default(),
329            timer: std::time::Instant::now(),
330            anchor_timer: std::time::Instant::now(),
331            shard_blocks_count_from_last_anchor: 0,
332            mempool_config_override,
333            cancel_collation,
334            wu_tuner_event_sender,
335            zerostate_id,
336        };
337
338        // finalize new state store tasks in background
339        tokio::spawn(async move {
340            while let Some(task) = background_store_new_state_rx.recv().await {
341                if let Err(err) = task.await {
342                    tracing::error!(target: tracing_targets::COLLATOR,
343                        "Error when store new state: {:?}", err,
344                    );
345                }
346            }
347        });
348
349        // create dispatcher for own async tasks queue
350        let dispatcher =
351            AsyncQueuedDispatcher::create(processor, STANDARD_QUEUED_DISPATCHER_BUFFER_SIZE);
352        tracing::trace!(target: tracing_targets::COLLATOR,
353            "(next_block_id={}): collator tasks queue dispatcher started", next_block_info,
354        );
355
356        // equeue first initialization task
357        // sending to the receiver here cannot return Error because it is guaranteed not closed or dropped
358        dispatcher
359            .enqueue_task(method_to_queued_async_closure!(
360                init_collator_wrapper,
361                prev_blocks_ids,
362                mc_data,
363                working_state_tx
364            ))
365            .await
366            .context("task receiver had to be not closed or dropped here")?;
367        tracing::info!(target: tracing_targets::COLLATOR,
368            "(next_block_id={}): collator initialization task enqueued", next_block_info,
369        );
370
371        tracing::info!(target: tracing_targets::COLLATOR,
372            "(next_block_id={}): collator started", next_block_info,
373        );
374
375        Ok(dispatcher)
376    }
377
378    async fn stop_collator(&mut self, dispatcher_cancel_token: CancellationToken) -> Result<()> {
379        self.listener
380            .on_collator_stopped(self.collation_session.id())
381            .await?;
382        dispatcher_cancel_token.cancel();
383        Ok(())
384    }
385
386    async fn init_collator_wrapper(
387        &mut self,
388        prev_blocks_ids: Vec<BlockId>,
389        mc_data: Arc<McData>,
390        working_state_tx: oneshot::Sender<Result<Box<WorkingState>>>,
391    ) -> Result<()> {
392        Box::pin(self.init_collator(prev_blocks_ids, mc_data, working_state_tx))
393            .await
394            .with_context(|| format!("next_block_id: {}", self.next_block_info))
395    }
396
397    // Initialize collator working state then run collation
398    #[tracing::instrument(skip_all, fields(next_block_id = %self.next_block_info))]
399    async fn init_collator(
400        &mut self,
401        prev_blocks_ids: Vec<BlockId>,
402        mc_data: Arc<McData>,
403        working_state_tx: oneshot::Sender<Result<Box<WorkingState>>>,
404    ) -> Result<()> {
405        tracing::info!(target: tracing_targets::COLLATOR, "initializing...");
406
407        // init working state
408        let mut working_state = Self::init_working_state(
409            &self.next_block_info,
410            self.state_node_adapter.clone(),
411            mc_data,
412            prev_blocks_ids,
413        )
414        .await?;
415
416        // get genesis info and reset externals if genesis was updated
417        let HandleGenesisResult {
418            anchors_proc_info_opt,
419            genesis_info,
420            genesis_updated,
421        } = self.handle_mempool_genesis(&mut working_state).await?;
422
423        // try import init anchors
424        if self
425            .try_import_init_anchors(&mut working_state, anchors_proc_info_opt, &genesis_info)
426            .await?
427            .should_cancel()
428        {
429            return Ok(());
430        }
431
432        working_state_tx.send(Ok(working_state)).ok();
433
434        self.timer = std::time::Instant::now();
435
436        self.anchor_timer = std::time::Instant::now();
437
438        tracing::info!(target: tracing_targets::COLLATOR, "init finished");
439
440        // trying to collate next block
441        tracing::info!(target: tracing_targets::COLLATOR, "trying to collate next block after init...");
442        Box::pin(self.wait_state_and_try_collate(genesis_updated)).await?;
443
444        Ok(())
445    }
446
447    /// Get actual genesis info and reset anchors cache and externals buffers
448    /// if genesis was updated from the previous mc block collation
449    async fn handle_mempool_genesis(
450        &mut self,
451        working_state: &mut WorkingState,
452    ) -> Result<HandleGenesisResult> {
453        // get last processed and last imported anchor info (will be None on zerostate)
454        let anchors_proc_info_opt = {
455            let prev_shard_data = working_state.prev_shard_data_ref();
456            let prev_block_id = prev_shard_data.blocks_ids()[0]; // TODO: consider split/merge
457            Self::get_anchors_processing_info(
458                &working_state.next_block_id_short.shard,
459                &working_state.mc_data,
460                &prev_block_id,
461                prev_shard_data.gen_chain_time(),
462                prev_shard_data
463                    .processed_upto()
464                    .get_min_externals_processed_to()?,
465            )
466        };
467
468        // get genesis info and detect if it was updated since the previous mc block collation
469        let (genesis_info, genesis_updated) = choose_genesis_info(
470            working_state.mc_data.consensus_info.genesis_info,
471            working_state
472                .mc_data
473                .prev_mc_data
474                .as_ref()
475                .map(|mcd| mcd.consensus_info.genesis_info),
476            self.mempool_config_override
477                .as_ref()
478                .map(|o| o.genesis_info),
479        );
480
481        tracing::debug!(target: tracing_targets::COLLATOR,
482            ?genesis_info,
483            genesis_updated,
484            "checked if genesis info was updated since the last mc block collation",
485        );
486
487        if !genesis_updated {
488            return Ok(HandleGenesisResult {
489                anchors_proc_info_opt,
490                genesis_info,
491                genesis_updated,
492            });
493        }
494
495        // reset anchors cache
496        self.anchors_cache.clear();
497
498        // needs to set last imported anchor info into cache
499        // to be able to import next anchor for collation
500        if let Some(anchors_proc_info) = &anchors_proc_info_opt {
501            self.set_anchors_cache_last_imported_from_genesis(&genesis_info, anchors_proc_info);
502        }
503
504        // reset externals buffers
505        for externals_range_state in working_state.reader_state.externals.ranges.values_mut() {
506            for partition in externals_range_state.by_partitions.values_mut() {
507                let buffer_to_drop = std::mem::take(&mut partition.buffer);
508                Reclaimer::instance().drop(buffer_to_drop);
509            }
510        }
511
512        tracing::info!(target: tracing_targets::COLLATOR,
513            ?anchors_proc_info_opt,
514            ?genesis_info,
515            genesis_updated,
516            "externals reset on genesis update finished",
517        );
518
519        Ok(HandleGenesisResult {
520            anchors_proc_info_opt,
521            genesis_info,
522            genesis_updated,
523        })
524    }
525
526    fn set_anchors_cache_last_imported_from_genesis(
527        &mut self,
528        genesis_info: &GenesisInfo,
529        anchors_proc_info: &AnchorsProcessingInfo,
530    ) {
531        self.anchors_cache.add_imported_anchor_info(AnchorInfo {
532            id: genesis_info.start_round,
533            ct: anchors_proc_info.last_imported_chain_time,
534            all_exts_count: 0,
535            our_exts_count: 0,
536            // SAFETY: omit author info for dummy anchor,
537            //      it will not be used to collate next block,
538            //      collator will import next anchor for sure
539            //      because processed_to anchor will be equal to last inported
540            author: PeerId(HashBytes::ZERO.0),
541        });
542    }
543
544    async fn check_and_import_init_anchors(
545        &mut self,
546        anchors_proc_info: &AnchorsProcessingInfo,
547        genesis_info: &GenesisInfo,
548    ) -> Result<ImportInitAnchorsResult, CollatorError> {
549        tracing::debug!(target: tracing_targets::COLLATOR,
550            %anchors_proc_info,
551            ?genesis_info,
552            "will check and import init anchors",
553        );
554
555        // detect if we can import init anchors and continue collation
556        let import_init_anchors = if genesis_info.start_round > 0 {
557            // when last processed_to anchor is after genesis start round
558            // we consider that genesis was in the past, so we can import init anchors
559            if anchors_proc_info.processed_to_anchor_id > genesis_info.start_round {
560                true
561            }
562            // when we start from new genesis we unable to import anchor at the start round
563            // because mempool actually is starting from the next round
564            // so we should not try to import init anchors
565            else if anchors_proc_info.processed_to_anchor_id == genesis_info.start_round {
566                false
567            }
568            // if last processed_to anchor is before the start round for master,
569            // then cancel collation because we need to receive more blocks from bc
570            else if self.shard_id.is_masterchain() {
571                return Err(CollatorError::Cancelled(
572                    CollationCancelReason::AnchorNotFound(anchors_proc_info.processed_to_anchor_id),
573                ));
574            }
575            // last processed_to anchor in shard can be before last processed in master
576            // it is normal, so we should not cancel collation but we still unable to import init anchors;
577            // possibly, we will produce an incorrect shard block then take correct from bc and try to collate next one
578            else {
579                false
580            }
581        } else {
582            // required to import init anchors when genesis is a zerostate
583            true
584        };
585
586        if import_init_anchors {
587            tracing::info!(target: tracing_targets::COLLATOR,
588                "importing init anchors from processed to anchor ({}) with offset ({}) \
589                to chain_time {} (current_shard_last_imported_chain_time = {})",
590                anchors_proc_info.processed_to_anchor_id,
591                anchors_proc_info.processed_to_msgs_offset,
592                anchors_proc_info.last_imported_chain_time,
593                anchors_proc_info.current_shard_last_imported_chain_time,
594            );
595
596            Self::import_init_anchors(
597                anchors_proc_info.processed_to_anchor_id,
598                anchors_proc_info.processed_to_msgs_offset,
599                anchors_proc_info.last_imported_chain_time,
600                anchors_proc_info.current_shard_last_imported_chain_time,
601                self.shard_id,
602                &mut self.anchors_cache,
603                self.mpool_adapter.clone(),
604            )
605            .await
606        } else {
607            tracing::info!(target: tracing_targets::COLLATOR,
608                "will not import init anchors",
609            );
610
611            // needs to set last imported anchor info into cache
612            // to be able to import next anchor for collation
613            if self.anchors_cache.last_imported_anchor_info().is_none() {
614                self.set_anchors_cache_last_imported_from_genesis(genesis_info, anchors_proc_info);
615            }
616
617            Ok(Default::default())
618        }
619    }
620
621    /// Try import init anchors if processing info is defined and import required.
622    /// If imported then reduce accumulated wu.
623    async fn try_import_init_anchors(
624        &mut self,
625        working_state: &mut WorkingState,
626        anchors_proc_info_opt: Option<AnchorsProcessingInfo>,
627        genesis_info: &GenesisInfo,
628    ) -> Result<NextCollationFlowStep> {
629        let Some(anchors_proc_info) = anchors_proc_info_opt else {
630            return Ok(NextCollationFlowStep::Continue);
631        };
632
633        let timer = std::time::Instant::now();
634
635        // anchors importing can stuck if mempool paused
636        // so allow to cancel collation here
637        let cancel_collation = self.cancel_collation.clone();
638
639        // await import to be finished or cancelled
640        let import_fut = self.check_and_import_init_anchors(&anchors_proc_info, genesis_info);
641        let import_res = tokio::select! {
642            res = import_fut => res,
643            _ = cancel_collation.notified() => {
644                tracing::info!(target: tracing_targets::COLLATOR,
645                    "collation was cancelled by manager",
646                );
647                let labels = [("workchain", self.shard_id.workchain().to_string())];
648                metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels[..]).increment(1);
649                Err(CollatorError::Cancelled(CollationCancelReason::ExternalCancel))
650            }
651        };
652
653        let ImportInitAnchorsResult {
654            anchors_info,
655            mut anchors_count_above_last_imported_in_current_shard,
656        } = match import_res {
657            Err(CollatorError::Cancelled(reason)) => {
658                self.listener
659                    .on_cancelled(
660                        working_state.mc_data.block_id,
661                        working_state.next_block_id_short,
662                        reason,
663                    )
664                    .await?;
665                return Ok(NextCollationFlowStep::Cancel);
666            }
667            res => res?,
668        };
669
670        if !anchors_info.is_empty() {
671            tracing::debug!(target: tracing_targets::COLLATOR,
672                elapsed = timer.elapsed().as_millis(),
673                new_next_block_id = %self.next_block_info,
674                "imported init anchors: {:?}",
675                anchors_info.as_slice(),
676            );
677
678            // reduce accumulated wu used from last anchor on
679            // the number of anchors imported after last in current shard
680            // if shard was not updated in last master block
681            let wu_used = &mut working_state.wu_used_from_last_anchor;
682            let wu_step = working_state.collation_config.wu_used_to_import_next_anchor;
683            while anchors_count_above_last_imported_in_current_shard > 0 {
684                anchors_count_above_last_imported_in_current_shard -= 1;
685                if let Some(new_wu_used) = wu_used.checked_sub(wu_step) {
686                    *wu_used = new_wu_used;
687                } else {
688                    break;
689                }
690            }
691        }
692
693        Ok(NextCollationFlowStep::Continue)
694    }
695
696    async fn resume_collation_wrapper(
697        &mut self,
698        mc_data: Arc<McData>,
699        reset: bool,
700        collation_session: Arc<CollationSessionInfo>,
701        new_prev_blocks_ids: Vec<BlockId>,
702    ) -> Result<()> {
703        Box::pin(self.resume_collation(mc_data, reset, collation_session, new_prev_blocks_ids))
704            .await
705            .with_context(|| format!("next_block_id: {}", self.next_block_info))
706    }
707
708    #[tracing::instrument(skip_all, fields(next_block_id = %self.next_block_info))]
709    async fn resume_collation(
710        &mut self,
711        mc_data: Arc<McData>,
712        reset: bool,
713        collation_session: Arc<CollationSessionInfo>,
714        new_prev_blocks_ids: Vec<BlockId>,
715    ) -> Result<()> {
716        let labels = [("workchain", self.shard_id.workchain().to_string())];
717        let histogram =
718            HistogramGuard::begin_with_labels("tycho_collator_resume_collation_time_high", &labels);
719
720        // update collation session info to refer to a correct subset in collated block
721        self.collation_session = collation_session;
722
723        // will detect if genesis info was updated from the previous mc block collation
724        let genesis_was_updated;
725
726        let mut working_state = if !reset {
727            let mut working_state = self.delayed_working_state.wait().await?;
728
729            tracing::info!(target: tracing_targets::COLLATOR,
730                mc_data_block_id = %working_state.mc_data.block_id.as_short_id(),
731                "resume collation without reset",
732            );
733
734            // update mc_data if newer
735            let prev_mc_seqno = working_state.mc_data.block_id.seqno;
736            if prev_mc_seqno < mc_data.block_id.seqno {
737                working_state.collation_config = Arc::new(mc_data.config.get_collation_config()?);
738                working_state.mc_data = mc_data;
739
740                if working_state.has_unprocessed_messages == Some(false) {
741                    working_state.has_unprocessed_messages = None;
742                }
743
744                // and only for shard collator
745                // update prev states to drop usage tree
746                if !self.shard_id.is_masterchain() {
747                    Self::reload_prev_data(
748                        prev_mc_seqno,
749                        &mut working_state,
750                        self.state_node_adapter.clone(),
751                        LoadStateHint {
752                            allow_ignore_direct: false,
753                        },
754                    )
755                    .await?;
756                }
757            }
758
759            // reset externals if genesis was updated
760            let HandleGenesisResult {
761                genesis_updated, ..
762            } = self.handle_mempool_genesis(&mut working_state).await?;
763            genesis_was_updated = genesis_updated;
764
765            working_state
766        } else {
767            // reset any delayed working state because we will init a new one
768            self.delayed_working_state.reset();
769
770            self.next_block_info = calc_next_block_id_short(&new_prev_blocks_ids);
771
772            tracing::info!(target: tracing_targets::COLLATOR,
773                mc_data_block_id = %mc_data.block_id.as_short_id(),
774                new_prev_blocks_ids = %DisplayBlockIdsIntoIter(&new_prev_blocks_ids),
775                new_next_block_id = %self.next_block_info,
776                "resume collation with reset",
777            );
778
779            // reload prev data, reinit working state, drop msgs buffer
780            tracing::debug!(target: tracing_targets::COLLATOR,
781                new_next_block_id = %self.next_block_info,
782                "reset working state and msgs buffer",
783            );
784            let mut working_state = Self::init_working_state(
785                &self.next_block_info,
786                self.state_node_adapter.clone(),
787                mc_data,
788                new_prev_blocks_ids,
789            )
790            .await?;
791
792            // get genesis info and reset externals if genesis was updated
793            let HandleGenesisResult {
794                anchors_proc_info_opt,
795                genesis_info,
796                genesis_updated,
797            } = self.handle_mempool_genesis(&mut working_state).await?;
798            genesis_was_updated = genesis_updated;
799
800            // try import init anchors
801            if self
802                .try_import_init_anchors(&mut working_state, anchors_proc_info_opt, &genesis_info)
803                .await?
804                .should_cancel()
805            {
806                return Ok(());
807            }
808
809            working_state
810        };
811
812        // will use time elapsed to resume collation to calculate wu price
813        working_state.resume_collation_elapsed = histogram.finish();
814
815        if self.shard_id.is_masterchain() {
816            Box::pin(self.try_collate_next_master_block_impl(working_state, genesis_was_updated))
817                .await
818        } else {
819            Box::pin(self.try_collate_next_shard_block_impl(working_state, genesis_was_updated))
820                .await
821        }
822    }
823
824    #[tracing::instrument(skip_all, fields(next_block_id = %next_block_id_short))]
825    async fn init_working_state(
826        next_block_id_short: &BlockIdShort,
827        state_node_adapter: Arc<dyn StateNodeAdapter>,
828        mc_data: Arc<McData>,
829        prev_blocks_ids: Vec<BlockId>,
830    ) -> Result<Box<WorkingState>> {
831        // load prev states and queue diff hashes
832        tracing::debug!(target: tracing_targets::COLLATOR,
833            prev_blocks_ids = %DisplayBlockIdsIntoIter(&prev_blocks_ids),
834            "loading prev states and queue diffs...",
835        );
836        let (prev_states, prev_queue_diff_hashes) = Self::load_states_and_diffs(
837            mc_data.block_id.seqno,
838            state_node_adapter,
839            prev_blocks_ids,
840        )
841        .await?;
842
843        // build and validate working state
844        tracing::debug!(target: tracing_targets::COLLATOR, "building working state...");
845
846        Self::build_and_validate_init_working_state(mc_data, prev_states, prev_queue_diff_hashes)
847    }
848
849    async fn reload_prev_data(
850        prev_mc_seqno: u32,
851        working_state: &mut WorkingState,
852        state_node_adapter: Arc<dyn StateNodeAdapter>,
853        hint: LoadStateHint,
854    ) -> Result<()> {
855        // drop prev usage tree
856        working_state.usage_tree.take();
857
858        // get prev shard data
859        let prev_shard_data = working_state
860            .prev_shard_data
861            .as_ref()
862            .expect("should exist here");
863        let prev_queue_diff_hashes = prev_shard_data.prev_queue_diff_hashes().clone();
864        let prev_blocks_ids = prev_shard_data.blocks_ids();
865
866        // reload prev state
867        tracing::debug!(target: tracing_targets::COLLATOR,
868            prev_blocks_ids = %DisplayBlockIdsIntoIter(prev_blocks_ids),
869            "loading prev states...",
870        );
871        let prev_states = Self::load_prev_states(
872            prev_mc_seqno,
873            state_node_adapter.as_ref(),
874            prev_blocks_ids,
875            hint,
876        )
877        .await?;
878
879        // update working state
880        tracing::debug!(target: tracing_targets::COLLATOR, "updating prev data in working state from reloaded state root...");
881
882        let (prev_shard_data, usage_tree) = PrevData::build(prev_states, prev_queue_diff_hashes)?;
883
884        // set new prev shard data and usage tree
885        working_state.prev_shard_data = Some(prev_shard_data);
886        working_state.usage_tree = Some(usage_tree);
887
888        Ok(())
889    }
890
891    /// Build working state update that would be applied before next collation
892    #[allow(clippy::too_many_arguments)]
893    fn prepare_working_state_update(
894        &mut self,
895        new_observable_state: ShardStateStuff,
896        store_new_state_task: JoinTask<Result<ShardStateStuff>>,
897        new_queue_diff_hash: HashBytes,
898        new_mc_data: Arc<McData>,
899        collation_config: Arc<CollationConfig>,
900        has_unprocessed_messages: bool,
901        reader_state: ReaderState,
902        resume_collation_elapsed: Duration,
903    ) -> Result<()> {
904        enum GetNewShardStateStuff {
905            ReloadFromStorage(JoinTask<Result<ShardStateStuff>>),
906            BuildFromNewObservable(ShardStateStuff),
907        }
908
909        let block_id = *new_observable_state.block_id();
910
911        let ref_mc_state_handle = new_observable_state.ref_mc_state_handle().clone();
912        let get_new_state_stuff = if block_id.is_masterchain() {
913            GetNewShardStateStuff::ReloadFromStorage(store_new_state_task)
914        } else {
915            self.background_store_new_state_tx
916                .send(store_new_state_task)
917                .ok();
918
919            GetNewShardStateStuff::BuildFromNewObservable(new_observable_state)
920        };
921
922        let labels = [("workchain", self.shard_id.workchain().to_string())];
923        self.delayed_working_state.future = Some(Box::pin(async move {
924            let _histogram = HistogramGuard::begin_with_labels(
925                "tycho_collator_build_new_state_time_high",
926                &labels,
927            );
928
929            let new_state_stuff = match get_new_state_stuff {
930                GetNewShardStateStuff::BuildFromNewObservable(state) => state,
931                GetNewShardStateStuff::ReloadFromStorage(store_new_state_task) => {
932                    store_new_state_task.await?
933                }
934            };
935
936            // Ensure that the original min ref handle outlives the store task.
937            // There is no explanation to this, but it mimics the previous
938            // behavour.
939            drop(ref_mc_state_handle);
940
941            let prev_states = vec![new_state_stuff];
942            let prev_queue_diff_hashes = vec![new_queue_diff_hash];
943            let (prev_shard_data, usage_tree) =
944                PrevData::build(prev_states, prev_queue_diff_hashes)?;
945
946            let next_block_id_short = calc_next_block_id_short(prev_shard_data.blocks_ids());
947
948            Ok(Box::new(WorkingState {
949                next_block_id_short,
950                mc_data: new_mc_data,
951                collation_config,
952                wu_used_from_last_anchor: prev_shard_data.wu_used_from_last_anchor(),
953                resume_collation_elapsed,
954                prev_shard_data: Some(prev_shard_data),
955                usage_tree: Some(usage_tree),
956                has_unprocessed_messages: Some(has_unprocessed_messages),
957                reader_state,
958            }))
959        }));
960
961        Ok(())
962    }
963
964    /// Load required prev states and prev queue diff hashes
965    async fn load_states_and_diffs(
966        ref_by_mc_seqno: u32,
967        state_node_adapter: Arc<dyn StateNodeAdapter>,
968        prev_blocks_ids: Vec<BlockId>,
969    ) -> Result<(Vec<ShardStateStuff>, Vec<HashBytes>)> {
970        // otherwise await prev states by prev block ids
971        let load_state_fut: JoinTask<Result<Vec<ShardStateStuff>>> = JoinTask::new({
972            let state_node_adapter = state_node_adapter.clone();
973            let prev_blocks_ids = prev_blocks_ids.clone();
974            let span = tracing::Span::current();
975            async move {
976                Self::load_prev_states(
977                    ref_by_mc_seqno,
978                    state_node_adapter.as_ref(),
979                    &prev_blocks_ids,
980                    Default::default(),
981                )
982                .await
983            }
984            .instrument(span)
985        });
986
987        let prev_hashes_fut: JoinTask<Result<Vec<HashBytes>>> = JoinTask::new({
988            let span = tracing::Span::current();
989            async move {
990                let mut prev_hashes = vec![];
991                for prev_block_id in prev_blocks_ids {
992                    // request state for prev block and wait for response
993                    if let Some(diff) = state_node_adapter.load_diff(&prev_block_id).await? {
994                        tracing::debug!(target: tracing_targets::COLLATOR,
995                            "loaded queue diff for prev_block_id {}",
996                            prev_block_id.as_short_id(),
997                        );
998                        prev_hashes.push(*diff.diff_hash());
999                    }
1000                }
1001                Ok(prev_hashes)
1002            }
1003            .instrument(span)
1004        });
1005
1006        let (prev_states, prev_hash) =
1007            futures_util::future::join(load_state_fut, prev_hashes_fut).await;
1008
1009        Ok((prev_states?, prev_hash?))
1010    }
1011
1012    async fn load_prev_states(
1013        prev_mc_seqno: u32,
1014        state_node_adapter: &dyn StateNodeAdapter,
1015        prev_blocks_ids: &[BlockId],
1016        hint: LoadStateHint,
1017    ) -> Result<Vec<ShardStateStuff>> {
1018        let mut prev_states = vec![];
1019        for prev_block_id in prev_blocks_ids {
1020            let state = state_node_adapter
1021                .load_state(prev_mc_seqno, prev_block_id, hint)
1022                .await?;
1023            tracing::debug!(target: tracing_targets::COLLATOR,
1024                "loaded prev shard state for prev_block_id {}",
1025                prev_block_id.as_short_id(),
1026            );
1027            prev_states.push(state);
1028        }
1029        Ok(prev_states)
1030    }
1031
1032    /// Build working state structure:
1033    /// * master state
1034    /// * observable previous state
1035    /// * usage tree that tracks data access to state cells
1036    ///
1037    /// Perform some validations on state
1038    fn build_and_validate_init_working_state(
1039        mc_data: Arc<McData>,
1040        prev_states: Vec<ShardStateStuff>,
1041        prev_queue_diff_hashes: Vec<HashBytes>,
1042    ) -> Result<Box<WorkingState>> {
1043        // TODO: consider split/merge
1044
1045        let (prev_shard_data, usage_tree) = PrevData::build(prev_states, prev_queue_diff_hashes)?;
1046
1047        let next_block_id_short = calc_next_block_id_short(prev_shard_data.blocks_ids());
1048
1049        let collation_config = Arc::new(mc_data.config.get_collation_config()?);
1050
1051        Ok(Box::new(WorkingState {
1052            next_block_id_short,
1053            mc_data,
1054            wu_used_from_last_anchor: prev_shard_data.wu_used_from_last_anchor(),
1055            resume_collation_elapsed: Duration::ZERO,
1056            reader_state: ReaderState::new(prev_shard_data.processed_upto()),
1057            prev_shard_data: Some(prev_shard_data),
1058            usage_tree: Some(usage_tree),
1059            has_unprocessed_messages: None,
1060            collation_config,
1061        }))
1062    }
1063
1064    fn get_anchors_processing_info(
1065        shard_id: &ShardIdent,
1066        mc_data: &McData,
1067        prev_block_id: &BlockId,
1068        prev_gen_chain_time: u64,
1069        prev_externals_processed_to: (MempoolAnchorId, u64),
1070    ) -> Option<AnchorsProcessingInfo> {
1071        // for shard
1072        // if [sb:01][mb:01][mb:02]
1073        //      then get processing info from [mb:02]
1074        // if [mb:01]
1075        //      then from [mb:01]
1076        let mut from_mc_info_opt = None;
1077        if !shard_id.is_masterchain() {
1078            let (mc_processed_to_anchor_id, mc_processed_to_msgs_offset) = mc_data
1079                .processed_upto
1080                .get_min_externals_processed_to()
1081                .unwrap_or_default();
1082            // on zerostate the last processed to anchor in the master will be 0, skip it
1083            if mc_processed_to_anchor_id > 0 {
1084                // TODO: consider split/merge
1085
1086                // get from mc data if prev shard block is equal to the top shard
1087                // and top shard was not updated in master
1088                // it means that no shard blocks were collated between masters
1089                // because there were no messages for processing
1090                // and we can omit top processed anchor from shard
1091                // if it is lower then top processed from master
1092                for (top_shard_id, top_shard_descr) in mc_data.shards.iter() {
1093                    if shard_id == top_shard_id {
1094                        if prev_block_id.seqno == top_shard_descr.seqno
1095                            && !top_shard_descr.top_sc_block_updated
1096                        {
1097                            from_mc_info_opt = Some(AnchorsProcessingInfo {
1098                                processed_to_anchor_id: mc_processed_to_anchor_id,
1099                                processed_to_msgs_offset: mc_processed_to_msgs_offset,
1100                                last_imported_chain_time: mc_data.gen_chain_time,
1101                                last_imported_in_block_id: mc_data.block_id,
1102                                current_shard_last_imported_chain_time: prev_gen_chain_time,
1103                            });
1104                        }
1105                        break;
1106                    }
1107                }
1108            }
1109        }
1110
1111        // get processing info from previous data
1112        // for shard
1113        // if [sb:01][mb:01]
1114        //      then from [sb:01]
1115        // if [sb:01][mb:01][sb:02]
1116        //      then from [sb:02]
1117        // if [mb:01]
1118        //      then None
1119        // for master - always from the previous master block or None on zerostate
1120        let from_prev_info_opt = match prev_externals_processed_to {
1121            // on zerostate the last processed to anchor in the shard will be 0, skip it
1122            (processed_to_anchor_id, processed_to_msgs_offset) if processed_to_anchor_id > 0 => {
1123                Some(AnchorsProcessingInfo {
1124                    processed_to_anchor_id,
1125                    processed_to_msgs_offset,
1126                    last_imported_chain_time: prev_gen_chain_time,
1127                    last_imported_in_block_id: *prev_block_id,
1128                    current_shard_last_imported_chain_time: prev_gen_chain_time,
1129                })
1130            }
1131            _ => None,
1132        };
1133
1134        // choose the higher one
1135        match (from_mc_info_opt, from_prev_info_opt) {
1136            // for shard
1137            // if [sb:01][mb:01][mb:02]
1138            //      if [mb:02] processed anchors ahead of [sb:01]
1139            //          then get info from [mb:02]
1140            //      if [sb:01] processed anchors ahead of [mb:01]
1141            //          then get info from [sb:01]
1142            (Some(from_mc_info), Some(from_prev_info)) => {
1143                if from_mc_info.processed_to_anchor_id > from_prev_info.processed_to_anchor_id
1144                    || (from_mc_info.processed_to_anchor_id
1145                        == from_prev_info.processed_to_anchor_id
1146                        && from_mc_info.processed_to_msgs_offset
1147                            > from_prev_info.processed_to_msgs_offset)
1148                {
1149                    Some(from_mc_info)
1150                } else {
1151                    Some(from_prev_info)
1152                }
1153            }
1154            // for shard
1155            // if [mb:01]
1156            //      then from [mb:01]
1157            (from_mc_info_opt, None) => from_mc_info_opt,
1158            // for shard
1159            // if [sb:01][mb:01]
1160            //      then from [sb:01]
1161            // if [sb:01][mb:01][sb:02]
1162            //      then from [sb:02]
1163            // from master - always from previous master
1164            (None, from_prev_info_opt) => from_prev_info_opt,
1165        }
1166    }
1167
1168    /// 1. Get last imported anchor id from cache
1169    /// 2. Check if should skip import by `max_consensus_lag_rounds * 2/3`
1170    /// 3. Import anyway if `force_anchor_import == true`
1171    /// 4. Await next anchor via mempool adapter
1172    /// 5. Store anchor in cache and return it
1173    async fn import_next_anchor(
1174        shard_id: ShardIdent,
1175        anchors_cache: &mut AnchorsCache,
1176        mpool_adapter: Arc<dyn MempoolAdapter>,
1177        top_processed_to_anchor: MempoolAnchorId,
1178        max_consensus_lag_rounds: u32,
1179        force_anchor_import: bool,
1180    ) -> Result<ImportNextAnchor> {
1181        let labels = [("workchain", shard_id.workchain().to_string())];
1182
1183        let _histogram =
1184            HistogramGuardWithLabels::begin("tycho_collator_import_next_anchor_time_high", &labels);
1185
1186        let timer = std::time::Instant::now();
1187
1188        let (prev_anchor_id, ct) = anchors_cache
1189            .get_last_imported_anchor_id_and_ct()
1190            .unwrap_or_default();
1191
1192        // do not import anchor if mempool may be paused
1193        // needs to process more anchors in collator first
1194        if !force_anchor_import
1195            && prev_anchor_id.saturating_sub(top_processed_to_anchor)
1196                > max_anchors_processing_lag_rounds(max_consensus_lag_rounds)
1197        {
1198            metrics::counter!("tycho_collator_anchor_import_skipped_count", &labels).increment(1);
1199            return Ok(ImportNextAnchor::Skipped);
1200        }
1201
1202        let requested_at = now_millis();
1203
1204        let get_anchor_result = mpool_adapter.get_next_anchor(prev_anchor_id).await?;
1205
1206        let has_our_externals = match &get_anchor_result {
1207            GetAnchorResult::Exist(next_anchor) => {
1208                let our_exts_count = next_anchor.count_externals_for(&shard_id, 0);
1209                anchors_cache.add(next_anchor.clone(), our_exts_count);
1210
1211                let has_externals = our_exts_count > 0;
1212
1213                metrics::counter!("tycho_collator_ext_msgs_imported_count", &labels)
1214                    .increment(our_exts_count as _);
1215                metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
1216                    .increment(our_exts_count as f64);
1217
1218                let chain_time_elapsed = next_anchor.chain_time.saturating_sub(ct);
1219
1220                tracing::info!(target: tracing_targets::COLLATOR,
1221                    elapsed = timer.elapsed().as_millis(),
1222                    chain_time_elapsed,
1223                    "imported next anchor (id: {}, chain_time: {}, all_exts: {}, our_exts: {})",
1224                    next_anchor.id,
1225                    next_anchor.chain_time,
1226                    next_anchor.externals.len(),
1227                    our_exts_count,
1228                );
1229
1230                has_externals
1231            }
1232            GetAnchorResult::NotExist => false,
1233        };
1234
1235        Ok(ImportNextAnchor::Result {
1236            prev_anchor_id,
1237            get_anchor_result,
1238            has_our_externals,
1239            requested_at,
1240        })
1241    }
1242
1243    /// 1. Get `processed_to` anchor
1244    /// 2. Get next anchors until `last_block_chain_time`
1245    /// 3. Store anchors in cache
1246    pub(self) async fn import_init_anchors(
1247        processed_to_anchor_id: MempoolAnchorId,
1248        processed_to_msgs_offset: u64,
1249        last_block_chain_time: u64,
1250        current_shard_last_imported_chain_time: u64,
1251        shard_id: ShardIdent,
1252        anchors_cache: &mut AnchorsCache,
1253        mpool_adapter: Arc<dyn MempoolAdapter>,
1254    ) -> Result<ImportInitAnchorsResult, CollatorError> {
1255        let labels = [("workchain", shard_id.workchain().to_string())];
1256
1257        let mut res = ImportInitAnchorsResult::default();
1258
1259        let mut last_anchor = None;
1260        let mut all_anchors_are_taken_from_cache = false;
1261        let mut processed_to_anchor_exists_in_cache = false;
1262
1263        // remove all anchors above last_block_chain_time
1264        anchors_cache.remove_last_imported_above(last_block_chain_time);
1265
1266        // look for required anchors in cache
1267        for anchor in anchors_cache.iter().map(|(_, ca)| &ca.anchor) {
1268            if anchor.id >= processed_to_anchor_id {
1269                // check if processed_to anchor exists in cache
1270                if anchor.id == processed_to_anchor_id {
1271                    processed_to_anchor_exists_in_cache = true;
1272
1273                    // skip fully read processed_to anchor
1274                    if anchor.externals.len() == processed_to_msgs_offset as usize {
1275                        continue;
1276                    }
1277                }
1278
1279                // NOTE: in case when we collated block 1 and now collating block 2
1280                //      and block 1 mismatched with received from bc,
1281                //      then we will collate block 2 again based on received block 1,
1282                //      but we may already have removed processed_to anchor
1283                //      from cache during the previous collation attempt of block 2,
1284                //      so we cannot use cache and should fully refill it
1285
1286                // use anchors from cache only when processed_to anchor exists in cache
1287                if !processed_to_anchor_exists_in_cache {
1288                    break;
1289                }
1290
1291                // when we found anchor with last_block_chain_time
1292                // it means that we have all required anchors in the cache
1293                if anchor.chain_time == last_block_chain_time {
1294                    all_anchors_are_taken_from_cache = true;
1295                }
1296
1297                // collect required anchors
1298                let our_exts_count = anchor.count_externals_for(&shard_id, 0);
1299                res.anchors_info
1300                    .push(InitAnchorSource::FromCache(AnchorInfo::from_anchor(
1301                        anchor,
1302                        our_exts_count,
1303                    )));
1304
1305                last_anchor = Some(anchor.clone());
1306            }
1307        }
1308
1309        // return if we have all required anchors in cache
1310        if all_anchors_are_taken_from_cache {
1311            let Some(InitAnchorSource::FromCache(_)) = res.anchors_info.last() else {
1312                return Err(CollatorError::Anyhow(anyhow::anyhow!(
1313                    "`anchors_info` should contain almost one `FromCache` here"
1314                )));
1315            };
1316
1317            return Ok(res);
1318        }
1319
1320        // clear cache if processed_to_anchor_id does not exist in cache
1321        if !processed_to_anchor_exists_in_cache {
1322            anchors_cache.clear();
1323            metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels).set(0);
1324        }
1325
1326        let mut prev_anchor_id;
1327        let mut last_imported_anchor_ct;
1328        let mut next_anchor = match last_anchor {
1329            Some(anchor) => {
1330                prev_anchor_id = anchor.id;
1331                last_imported_anchor_ct = anchor.chain_time;
1332                None
1333            }
1334            None => {
1335                let GetAnchorResult::Exist(anchor) = mpool_adapter
1336                    .get_anchor_by_id(processed_to_anchor_id)
1337                    .await?
1338                else {
1339                    return Err(CollatorError::Cancelled(
1340                        CollationCancelReason::AnchorNotFound(processed_to_anchor_id),
1341                    ));
1342                };
1343                prev_anchor_id = anchor.id;
1344                last_imported_anchor_ct = anchor.chain_time;
1345                Some(anchor)
1346            }
1347        };
1348
1349        loop {
1350            // add loaded anchor to cache
1351            if let Some(anchor) = next_anchor {
1352                let our_exts_count = anchor.count_externals_for(&shard_id, 0);
1353
1354                // do not add to cache fully read processed_to anchor
1355                if anchor.id == processed_to_anchor_id
1356                    && anchor.externals.len() == processed_to_msgs_offset as usize
1357                {
1358                    // only add imported anchor info
1359                    anchors_cache
1360                        .add_imported_anchor_info(AnchorInfo::from_anchor(&anchor, our_exts_count));
1361                } else {
1362                    // otherwise add imported anchor info and anchor itself
1363                    anchors_cache.add(anchor.clone(), our_exts_count);
1364
1365                    metrics::counter!("tycho_collator_ext_msgs_imported_count", &labels)
1366                        .increment(our_exts_count as u64);
1367                    metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
1368                        .increment(our_exts_count as f64);
1369                }
1370
1371                // anyway anchor has been imported
1372                res.anchors_info
1373                    .push(InitAnchorSource::Imported(AnchorInfo::from_anchor(
1374                        &anchor,
1375                        our_exts_count,
1376                    )));
1377            }
1378
1379            // count number of anchors imported above last chain time for current shard
1380            if last_imported_anchor_ct > current_shard_last_imported_chain_time {
1381                res.anchors_count_above_last_imported_in_current_shard += 1;
1382            }
1383
1384            // import next anchor if last block chain time not reached
1385            if last_imported_anchor_ct >= last_block_chain_time {
1386                break;
1387            }
1388
1389            let GetAnchorResult::Exist(anchor) =
1390                mpool_adapter.get_next_anchor(prev_anchor_id).await?
1391            else {
1392                return Err(CollatorError::Cancelled(
1393                    CollationCancelReason::NextAnchorNotFound(prev_anchor_id),
1394                ));
1395            };
1396            prev_anchor_id = anchor.id;
1397            last_imported_anchor_ct = anchor.chain_time;
1398
1399            next_anchor = Some(anchor);
1400        }
1401
1402        Ok(res)
1403    }
1404
1405    fn check_has_unprocessed_messages(&mut self, working_state: &mut WorkingState) -> Result<bool> {
1406        if let Some(has_messages) = working_state.has_unprocessed_messages {
1407            return Ok(has_messages);
1408        }
1409
1410        // when we have processed offset > 0 in externals or externals
1411        // then we should have uprocessed messages in buffer
1412        if working_state
1413            .reader_state
1414            .check_has_non_zero_processed_offset()
1415        {
1416            working_state.has_unprocessed_messages = Some(true);
1417            return Ok(true);
1418        }
1419
1420        // if all internals and externals read
1421        // then check if any buffer contain messages
1422        if working_state.reader_state.has_messages_in_buffers() {
1423            working_state.has_unprocessed_messages = Some(true);
1424            return Ok(true);
1425        }
1426
1427        // finally check if has pending messages in iterators
1428
1429        let msgs_exec_params = MsgsExecutionParamsStuff::create(
1430            working_state
1431                .prev_shard_data_ref()
1432                .processed_upto()
1433                .msgs_exec_params
1434                .clone(),
1435            working_state.collation_config.msgs_exec_params.clone(),
1436        );
1437
1438        // create temporary empty anchors cache for checking pending messages
1439        let mut temp_anchors_cache = AnchorsCache::default();
1440
1441        let mut tx = AnchorsCacheTransaction::new(&mut temp_anchors_cache);
1442
1443        // Extract values before creating MessagesReaderContext to avoid borrow conflicts
1444        let for_shard_id = working_state.next_block_id_short.shard;
1445        let block_seqno = working_state.next_block_id_short.seqno;
1446        let mc_state_gen_lt = working_state.mc_data.gen_lt;
1447        let prev_state_gen_lt = working_state.prev_shard_data_ref().gen_lt();
1448        let mc_top_shards_end_lts: Vec<_> = working_state
1449            .mc_data
1450            .shards
1451            .iter()
1452            .map(|(k, v)| (*k, v.end_lt))
1453            .collect();
1454        let is_first = is_first_block_after_prev_master(
1455            working_state.prev_shard_data_ref().blocks_ids()[0], // TODO: consider split/merge
1456            &working_state.mc_data.shards,
1457        );
1458
1459        // create reader
1460        let mut messages_reader = MessagesReader::new(
1461            MessagesReaderContext {
1462                for_shard_id,
1463                block_seqno,
1464                next_chain_time: 0,
1465                msgs_exec_params,
1466                mc_state_gen_lt,
1467                prev_state_gen_lt,
1468                mc_top_shards_end_lts,
1469                cumulative_stats_calc_params: None,
1470                // pass reader state by reference
1471                reader_state: &mut working_state.reader_state,
1472                // do not use anchors cache because we need to check
1473                // only for pending internals in iterators
1474                anchors_cache: &mut tx,
1475                is_first_block_after_prev_master: is_first,
1476                part_stat_ranges: None,
1477            },
1478            self.mq_adapter.clone(),
1479        )?;
1480
1481        // check if has pending internals
1482        let has_pending_internals = messages_reader.check_has_pending_internals_in_iterators()?;
1483
1484        // drop created next internals range readers for master
1485        // because shard end_lt does not include updated top shard block info
1486        if working_state.next_block_id_short.is_masterchain() {
1487            messages_reader.drop_internals_next_range_readers();
1488        } else if !has_pending_internals {
1489            // drop created next internals range readers for shard
1490            // when we do not have pending internals
1491            // because master can be changed on the next check and
1492            // range to boudary should be updated
1493            messages_reader.drop_internals_next_range_readers();
1494        }
1495
1496        // return reader state to working state
1497        let _ = messages_reader.finalize(
1498            0, // can pass 0 because new messages reader was not initialized in this case
1499            &Default::default(),
1500        )?;
1501
1502        working_state.has_unprocessed_messages = Some(has_pending_internals);
1503
1504        Ok(has_pending_internals)
1505    }
1506
1507    async fn wait_state_and_try_collate_wrapper(&mut self) -> Result<()> {
1508        Box::pin(self.wait_state_and_try_collate(false))
1509            .await
1510            .with_context(|| format!("next_block_id: {}", self.next_block_info))
1511    }
1512
1513    async fn wait_state_and_try_collate(&mut self, force_one_anchor_import: bool) -> Result<()> {
1514        let working_state = self.delayed_working_state.wait().await?;
1515
1516        if self.shard_id.is_masterchain() {
1517            Box::pin(
1518                self.try_collate_next_master_block_impl(working_state, force_one_anchor_import),
1519            )
1520            .await
1521        } else {
1522            Box::pin(self.try_collate_next_shard_block_impl(working_state, force_one_anchor_import))
1523                .await
1524        }
1525    }
1526
1527    async fn wait_state_and_do_collate_wrapper(
1528        &mut self,
1529        top_shard_blocks_info: Vec<TopBlockDescription>,
1530        next_chain_time: u64,
1531    ) -> Result<()> {
1532        Box::pin(self.wait_state_and_do_collate(top_shard_blocks_info, next_chain_time))
1533            .await
1534            .with_context(|| format!("next_block_id: {}", self.next_block_info))
1535    }
1536
1537    async fn wait_state_and_do_collate(
1538        &mut self,
1539        top_shard_blocks_info: Vec<TopBlockDescription>,
1540        next_chain_time: u64,
1541    ) -> Result<()> {
1542        let mut working_state = self.delayed_working_state.wait().await?;
1543
1544        // if last imported anchor chain time is less then next required
1545        // then we should import next anchors until we reach required chain time
1546        let (mut last_imported_anchor_id, mut last_imported_chain_time) = self
1547            .anchors_cache
1548            .get_last_imported_anchor_id_and_ct()
1549            .context("should contain at least one imported anchor info here")?;
1550        if last_imported_chain_time < next_chain_time {
1551            let labels = [("workchain", self.shard_id.workchain().to_string())];
1552            let top_processed_to_anchor = working_state.mc_data.top_processed_to_anchor;
1553            let max_consensus_lag_rounds = working_state
1554                .mc_data
1555                .config
1556                .get_consensus_config()?
1557                .max_consensus_lag_rounds
1558                .get() as u32;
1559            while last_imported_chain_time < next_chain_time {
1560                // next anchor importing can stuck if mempool paused
1561                // so allow to cancel collation here
1562                let collation_cancelled = self.cancel_collation.notified();
1563                let import_fut = Self::import_next_anchor(
1564                    self.shard_id,
1565                    &mut self.anchors_cache,
1566                    self.mpool_adapter.clone(),
1567                    top_processed_to_anchor,
1568                    max_consensus_lag_rounds,
1569                    false,
1570                );
1571
1572                let import_anchor_result = tokio::select! {
1573                    res = import_fut => res?,
1574                    _ = collation_cancelled => {
1575                        tracing::info!(target: tracing_targets::COLLATOR,
1576                            top_processed_to_anchor,
1577                            last_imported_anchor_id,
1578                            last_imported_chain_time,
1579                            "collation was cancelled by manager on wait_state_and_do_collate",
1580                        );
1581                        metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels).increment(1);
1582                        self.listener
1583                            .on_cancelled(
1584                                working_state.mc_data.block_id,
1585                                working_state.next_block_id_short,
1586                                CollationCancelReason::ExternalCancel,
1587                            )
1588                            .await?;
1589                        self.delayed_working_state.delay(working_state);
1590                        return Ok(());
1591                    }
1592                };
1593
1594                match import_anchor_result {
1595                    ImportNextAnchor::Skipped => {
1596                        anyhow::bail!(
1597                            "anchor import cannot be skipped here because anchor \
1598                            with next_chain_time {} should exit",
1599                            next_chain_time,
1600                        )
1601                    }
1602                    ImportNextAnchor::Result {
1603                        get_anchor_result,
1604                        has_our_externals,
1605                        ..
1606                    } => match get_anchor_result {
1607                        GetAnchorResult::NotExist => {
1608                            anyhow::bail!(
1609                                "anchor with next_chain_time {} should exit",
1610                                next_chain_time
1611                            )
1612                        }
1613                        GetAnchorResult::Exist(next_anchor) => {
1614                            // time elapsed from prev anchor
1615                            let elapsed_from_prev_anchor = self.anchor_timer.elapsed();
1616                            self.anchor_timer = std::time::Instant::now();
1617                            metrics::histogram!(
1618                                "tycho_collator_from_prev_anchor_time_high",
1619                                &labels
1620                            )
1621                            .record(elapsed_from_prev_anchor);
1622
1623                            working_state.wu_used_from_last_anchor = 0;
1624
1625                            // time between anchors
1626                            let elapsed_between_anchors = next_anchor
1627                                .chain_time
1628                                .saturating_sub(last_imported_chain_time);
1629                            metrics::histogram!(
1630                                "tycho_collator_between_anchors_time_high",
1631                                &labels,
1632                            )
1633                            .record(Duration::from_millis(elapsed_between_anchors));
1634
1635                            last_imported_anchor_id = next_anchor.id;
1636                            last_imported_chain_time = next_anchor.chain_time;
1637
1638                            tracing::debug!(target: tracing_targets::COLLATOR,
1639                                top_processed_to_anchor,
1640                                last_imported_anchor_id,
1641                                last_imported_chain_time,
1642                                has_externals = has_our_externals,
1643                                "imported next anchor to reach next_chain_time",
1644                            );
1645                        }
1646                    },
1647                }
1648            }
1649        }
1650
1651        Box::pin(self.do_collate(
1652            working_state,
1653            Some(top_shard_blocks_info),
1654            next_chain_time,
1655            ForceMasterCollation::No,
1656        ))
1657        .await
1658    }
1659
1660    /// Run collation if there are internals,
1661    /// otherwise import next anchor and notify it to manager
1662    /// that will route next collation steps
1663    #[tracing::instrument(
1664        parent = None, name = "try_collate_next_master_block",
1665        skip_all, fields(
1666            next_block_id = %self.next_block_info,
1667            mc_data_block_id = %working_state.mc_data.block_id.as_short_id(),
1668        )
1669    )]
1670    async fn try_collate_next_master_block_impl(
1671        &mut self,
1672        mut working_state: Box<WorkingState>,
1673        mut force_one_anchor_import: bool,
1674    ) -> Result<()> {
1675        tracing::debug!(target: tracing_targets::COLLATOR,
1676            force_one_anchor_import,
1677            "Check if can collate next master block",
1678        );
1679
1680        let labels = [("workchain", self.shard_id.workchain().to_string())];
1681        let _histogram = HistogramGuardWithLabels::begin(
1682            "tycho_collator_try_collate_next_master_block_time",
1683            &labels,
1684        );
1685
1686        let top_processed_to_anchor = working_state.mc_data.top_processed_to_anchor;
1687
1688        let (mut last_imported_anchor_id, mut last_imported_chain_time) = self
1689            .anchors_cache
1690            .get_last_imported_anchor_id_and_ct()
1691            .unwrap_or_default();
1692
1693        // check if has unprocessed messages in buffer or queue
1694        let has_unprocessed_messages = self.check_has_unprocessed_messages(&mut working_state)?;
1695
1696        // check if should force master collation by unprocessed messages
1697        let force_mc_collation =
1698            has_unprocessed_messages.then_some(ForceMasterCollation::ByUnprocessedMessages);
1699
1700        match (has_unprocessed_messages, force_one_anchor_import) {
1701            (true, false) => {
1702                // do not import next anchor and force collation
1703                tracing::info!(target: tracing_targets::COLLATOR,
1704                    top_processed_to_anchor,
1705                    last_imported_anchor_id,
1706                    last_imported_chain_time,
1707                    "there are unprocessed messages from previous block, will collate next block",
1708                );
1709
1710                self.listener
1711                    .on_skipped(
1712                        working_state.mc_data.block_id,
1713                        working_state.next_block_id_short,
1714                        working_state.prev_shard_data_ref().gen_chain_time(),
1715                        force_mc_collation.expect(
1716                            "should be Some(..) here because of `has_unprocessed_messages = true`",
1717                        ),
1718                        working_state.collation_config.clone(),
1719                    )
1720                    .await?;
1721                self.delayed_working_state.delay(working_state);
1722                return Ok(());
1723            }
1724            (false, _) => {
1725                tracing::debug!(target: tracing_targets::COLLATOR,
1726                    top_processed_to_anchor,
1727                    last_imported_anchor_id,
1728                    last_imported_chain_time,
1729                    "there are no unprocessed messages, will import next anchor",
1730                );
1731            }
1732            (_, true) => {
1733                tracing::info!(target: tracing_targets::COLLATOR,
1734                    force_one_anchor_import,
1735                    "will import next anchor",
1736                );
1737            }
1738        }
1739
1740        // import next anchor and check if can collate in manager
1741
1742        // next anchor importing can stuck if mempool paused
1743        // so allow to cancel collation here
1744        let collation_cancelled = self.cancel_collation.notified();
1745        let import_fut = Self::import_next_anchor(
1746            self.shard_id,
1747            &mut self.anchors_cache,
1748            self.mpool_adapter.clone(),
1749            working_state.mc_data.top_processed_to_anchor,
1750            working_state
1751                .mc_data
1752                .config
1753                .get_consensus_config()?
1754                .max_consensus_lag_rounds
1755                .get() as u32,
1756            std::mem::take(&mut force_one_anchor_import),
1757        );
1758
1759        let import_anchor_result = tokio::select! {
1760            res = import_fut => res?,
1761            _ = collation_cancelled => {
1762                tracing::info!(target: tracing_targets::COLLATOR,
1763                    top_processed_to_anchor,
1764                    last_imported_anchor_id,
1765                    last_imported_chain_time,
1766                    "collation was cancelled by manager on try_collate_next_master_block",
1767                );
1768                metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels).increment(1);
1769                self.listener
1770                    .on_cancelled(
1771                        working_state.mc_data.block_id,
1772                        working_state.next_block_id_short,
1773                        CollationCancelReason::ExternalCancel,
1774                    )
1775                    .await?;
1776                self.delayed_working_state.delay(working_state);
1777                return Ok(());
1778            }
1779        };
1780
1781        match import_anchor_result {
1782            ImportNextAnchor::Result {
1783                prev_anchor_id,
1784                get_anchor_result,
1785                has_our_externals,
1786                ..
1787            } => match get_anchor_result {
1788                GetAnchorResult::NotExist => {
1789                    tracing::warn!(target: tracing_targets::COLLATOR,
1790                        top_processed_to_anchor,
1791                        last_imported_anchor_id,
1792                        last_imported_chain_time,
1793                        "next anchor not exist, cancel collation attempts",
1794                    );
1795                    self.listener
1796                        .on_cancelled(
1797                            working_state.mc_data.block_id,
1798                            working_state.next_block_id_short,
1799                            CollationCancelReason::NextAnchorNotFound(prev_anchor_id),
1800                        )
1801                        .await?;
1802                    self.delayed_working_state.delay(working_state);
1803                    return Ok(());
1804                }
1805                GetAnchorResult::Exist(next_anchor) => {
1806                    // time elapsed from prev anchor
1807                    let elapsed_from_prev_anchor = self.anchor_timer.elapsed();
1808                    self.anchor_timer = std::time::Instant::now();
1809                    metrics::histogram!("tycho_collator_from_prev_anchor_time_high", &labels)
1810                        .record(elapsed_from_prev_anchor);
1811
1812                    working_state.wu_used_from_last_anchor = 0;
1813
1814                    // time between anchors
1815                    let elapsed_between_anchors = next_anchor.chain_time - last_imported_chain_time;
1816                    metrics::histogram!("tycho_collator_between_anchors_time_high", &labels,)
1817                        .record(Duration::from_millis(elapsed_between_anchors));
1818
1819                    last_imported_anchor_id = next_anchor.id;
1820                    last_imported_chain_time = next_anchor.chain_time;
1821
1822                    tracing::debug!(target: tracing_targets::COLLATOR,
1823                        force_mc_block = false,
1824                        top_processed_to_anchor,
1825                        last_imported_anchor_id,
1826                        last_imported_chain_time,
1827                        has_externals = has_our_externals,
1828                        "imported next anchor, will notify collation manager",
1829                    );
1830
1831                    // this may start master block collation or cause next anchor import
1832                    self.listener
1833                        .on_skipped(
1834                            working_state.mc_data.block_id,
1835                            working_state.next_block_id_short,
1836                            next_anchor.chain_time,
1837                            force_mc_collation.unwrap_or(ForceMasterCollation::No),
1838                            working_state.collation_config.clone(),
1839                        )
1840                        .await?;
1841
1842                    self.delayed_working_state.delay(working_state);
1843                }
1844            },
1845            ImportNextAnchor::Skipped => {
1846                tracing::debug!(target: tracing_targets::COLLATOR,
1847                    force_mc_block = true,
1848                    top_processed_to_anchor,
1849                    last_imported_anchor_id,
1850                    last_imported_chain_time,
1851                    "mempool paused, will notify collation manager",
1852                );
1853
1854                // this may start master block collation or cause next anchor import
1855                self.listener
1856                    .on_skipped(
1857                        working_state.mc_data.block_id,
1858                        working_state.next_block_id_short,
1859                        last_imported_chain_time,
1860                        force_mc_collation.unwrap_or(ForceMasterCollation::ByAnchorImportSkipped),
1861                        working_state.collation_config.clone(),
1862                    )
1863                    .await?;
1864
1865                self.delayed_working_state.delay(working_state);
1866            }
1867        }
1868
1869        Ok(())
1870    }
1871
1872    /// Run collation if there are internals or externals,
1873    /// otherwise import next anchor.
1874    /// Import next anchor if wu used exceeded limit or if one import forced.
1875    /// If it has externals then run collation,
1876    /// otherwise skip collation and notify collation manager.
1877    /// Skip collation and notify collation manager
1878    /// if max uncommitted chain length reached.
1879    #[tracing::instrument(
1880        parent = None, name = "try_collate_next_shard_block",
1881        skip_all, fields(
1882            next_block_id = %self.next_block_info,
1883            mc_data_block_id = %working_state.mc_data.block_id.as_short_id(),
1884        )
1885    )]
1886    async fn try_collate_next_shard_block_impl(
1887        &mut self,
1888        mut working_state: Box<WorkingState>,
1889        mut force_one_anchor_import: bool,
1890    ) -> Result<()> {
1891        tracing::debug!(target: tracing_targets::COLLATOR,
1892            force_one_anchor_import,
1893            "Check if can collate next shard block",
1894        );
1895
1896        let labels = [("workchain", self.shard_id.workchain().to_string())];
1897        let histogram = HistogramGuardWithLabels::begin(
1898            "tycho_collator_try_collate_next_shard_block_time",
1899            &labels,
1900        );
1901
1902        let top_processed_to_anchor = working_state.mc_data.top_processed_to_anchor;
1903
1904        // check if should force master block collation by uncommitted chain length
1905        let mut last_committed_seqno = 0;
1906        for (shard_id, shard_descr) in working_state.mc_data.shards.iter() {
1907            if *shard_id == self.shard_id {
1908                last_committed_seqno = shard_descr.seqno;
1909            }
1910        }
1911        let uncommitted_chain_length =
1912            working_state.next_block_id_short.seqno - 1 - last_committed_seqno;
1913        let max_uncommitted_chain_length =
1914            working_state.collation_config.max_uncommitted_chain_length as u32;
1915
1916        let force_mc_block_by_uncommitted_chain =
1917            uncommitted_chain_length >= max_uncommitted_chain_length;
1918
1919        // check if should collate next shard block, import anchors if required
1920        #[derive(Debug)]
1921        enum TryCollateCheck {
1922            ForceMcBlockByUncommittedChainLength,
1923            HasUnprocessedMessages,
1924            HasExternals,
1925            NoPendingMessages,
1926            ForceEmptyShardBlock,
1927        }
1928        let mut anchor_import_skipped = false;
1929        let mut try_collate_check = 'check: {
1930            {
1931                // check has unprocessed messages in buffer or queue
1932                let has_uprocessed_messages =
1933                    self.check_has_unprocessed_messages(&mut working_state)?;
1934
1935                // check pending externals
1936                let has_externals = self.anchors_cache.has_pending_externals();
1937
1938                // calculate mc block max interval threshold after the previous mc block
1939                let mc_block_max_interval_threshold = working_state.mc_data.gen_chain_time
1940                    + working_state.collation_config.mc_block_max_interval_ms as u64;
1941
1942                // check if should import anchor after fixed wu used by blocks collation
1943                let wu_used_from_last_anchor = working_state.wu_used_from_last_anchor;
1944                let wu_used_to_import_next_anchor =
1945                    working_state.collation_config.wu_used_to_import_next_anchor;
1946                let should_import_anchor_by_used_wu =
1947                    wu_used_from_last_anchor >= wu_used_to_import_next_anchor;
1948
1949                // report wu used related metrics
1950                metrics::gauge!("tycho_collator_wu_used_from_last_anchor", &[(
1951                    "type",
1952                    "wu_one_anchor"
1953                ),])
1954                .set(wu_used_to_import_next_anchor as f64);
1955                metrics::gauge!("tycho_collator_wu_used_from_last_anchor", &[(
1956                    "type",
1957                    "wu_used_sum"
1958                ),])
1959                .set(wu_used_from_last_anchor as f64);
1960                let can_import_anchors_count =
1961                    wu_used_from_last_anchor.saturating_div(wu_used_to_import_next_anchor);
1962                metrics::gauge!("tycho_collator_can_import_anchors_count")
1963                    .set(can_import_anchors_count as f64);
1964
1965                // decide if should import anchors
1966                let (mut last_imported_anchor_id, mut last_imported_chain_time) = self
1967                    .anchors_cache
1968                    .get_last_imported_anchor_id_and_ct()
1969                    .unwrap_or_default();
1970                match (
1971                    has_uprocessed_messages,
1972                    has_externals,
1973                    should_import_anchor_by_used_wu,
1974                    force_one_anchor_import,
1975                ) {
1976                    // When uncommitted chain limit is reached we still import anchors
1977                    // and later force master collation regardless of pending messages
1978                    _ if force_mc_block_by_uncommitted_chain => {
1979                        tracing::debug!(target: tracing_targets::COLLATOR,
1980                            top_processed_to_anchor,
1981                            last_imported_anchor_id,
1982                            last_imported_chain_time,
1983                            uncommitted_chain_length,
1984                            max_uncommitted_chain_length,
1985                            "uncommitted chain limit reached, will import next anchor and then force master collation",
1986                        );
1987                    }
1988                    (true, _, false, false) => {
1989                        break 'check TryCollateCheck::HasUnprocessedMessages;
1990                    }
1991                    (_, true, false, false) => break 'check TryCollateCheck::HasExternals,
1992                    (false, false, false, false) => {
1993                        tracing::debug!(target: tracing_targets::COLLATOR,
1994                            top_processed_to_anchor,
1995                            last_imported_anchor_id,
1996                            last_imported_chain_time,
1997                            "there are no pending internals or externals, will import next anchor",
1998                        );
1999
2000                        // drop used wu when anchor was not imported by used wu
2001                        working_state.wu_used_from_last_anchor = 0;
2002                    }
2003                    (_, _, true, false) => {
2004                        tracing::info!(target: tracing_targets::COLLATOR,
2005                            top_processed_to_anchor,
2006                            last_imported_anchor_id,
2007                            last_imported_chain_time,
2008                            "wu used from last anchor {} reached limit {} on length {}, will import next anchor",
2009                            wu_used_from_last_anchor, wu_used_to_import_next_anchor,  uncommitted_chain_length,
2010                        );
2011                    }
2012                    (_, _, _, true) => {
2013                        tracing::info!(target: tracing_targets::COLLATOR,
2014                            force_one_anchor_import,
2015                            "will import next anchor",
2016                        );
2017                    }
2018                }
2019
2020                let max_consensus_lag_rounds = working_state
2021                    .mc_data
2022                    .config
2023                    .get_consensus_config()?
2024                    .max_consensus_lag_rounds
2025                    .get() as u32;
2026
2027                let mut imported_anchors_count = 0;
2028                let mut imported_anchors_has_externals = false;
2029
2030                // import anchors until wu_used_from_last_anchor > wu_used_to_import_next_anchor
2031                loop {
2032                    // next anchor importing can stuck if mempool paused
2033                    // so allow to cancel collation here
2034                    let collation_cancelled = self.cancel_collation.notified();
2035                    let import_fut = Self::import_next_anchor(
2036                        self.shard_id,
2037                        &mut self.anchors_cache,
2038                        self.mpool_adapter.clone(),
2039                        working_state.mc_data.top_processed_to_anchor,
2040                        max_consensus_lag_rounds,
2041                        std::mem::take(&mut force_one_anchor_import),
2042                    );
2043
2044                    let import_anchor_result = tokio::select! {
2045                        res = import_fut => res?,
2046                        _ = collation_cancelled => {
2047                            tracing::info!(target: tracing_targets::COLLATOR,
2048                                top_processed_to_anchor,
2049                                last_imported_anchor_id,
2050                                last_imported_chain_time,
2051                                "collation was cancelled by manager on try_collate_next_shard_block",
2052                            );
2053                            metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels).increment(1);
2054                            self.listener
2055                                .on_cancelled(
2056                                    working_state.mc_data.block_id,
2057                                    working_state.next_block_id_short,
2058                                    CollationCancelReason::ExternalCancel,
2059                                )
2060                                .await?;
2061                            self.delayed_working_state.delay(working_state);
2062                            return Ok(());
2063                        }
2064                    };
2065
2066                    match import_anchor_result {
2067                        ImportNextAnchor::Result {
2068                            prev_anchor_id,
2069                            get_anchor_result,
2070                            has_our_externals,
2071                            requested_at,
2072                        } => match get_anchor_result {
2073                            GetAnchorResult::NotExist => {
2074                                // cancel collation attempts if mempool cannot return required anchor
2075                                tracing::warn!(target: tracing_targets::COLLATOR,
2076                                    top_processed_to_anchor,
2077                                    last_imported_anchor_id,
2078                                    last_imported_chain_time,
2079                                    "next anchor not exist, cancel collation attempts",
2080                                );
2081                                self.listener
2082                                    .on_cancelled(
2083                                        working_state.mc_data.block_id,
2084                                        working_state.next_block_id_short,
2085                                        CollationCancelReason::NextAnchorNotFound(prev_anchor_id),
2086                                    )
2087                                    .await?;
2088                                self.delayed_working_state.delay(working_state);
2089                                return Ok(());
2090                            }
2091                            GetAnchorResult::Exist(anchor) => {
2092                                imported_anchors_count += 1;
2093
2094                                // time elapsed from prev anchor
2095                                let elapsed_from_prev_anchor = self.anchor_timer.elapsed();
2096                                self.anchor_timer = std::time::Instant::now();
2097                                metrics::histogram!(
2098                                    "tycho_collator_from_prev_anchor_time_high",
2099                                    &labels
2100                                )
2101                                .record(elapsed_from_prev_anchor);
2102
2103                                // time between anchors
2104                                let elapsed_between_anchors =
2105                                    anchor.chain_time - last_imported_chain_time;
2106                                metrics::histogram!(
2107                                    "tycho_collator_between_anchors_time_high",
2108                                    &labels,
2109                                )
2110                                .record(Duration::from_millis(elapsed_between_anchors));
2111
2112                                last_imported_anchor_id = anchor.id;
2113                                last_imported_chain_time = anchor.chain_time;
2114
2115                                metrics::gauge!("tycho_collator_shard_blocks_count_btw_anchors")
2116                                    .set(self.shard_blocks_count_from_last_anchor);
2117                                self.shard_blocks_count_from_last_anchor = 0;
2118
2119                                // report anchor lag
2120                                let lag = work_units::MempoolAnchorLag {
2121                                    requested_at,
2122                                    chain_time: anchor.chain_time,
2123                                };
2124                                let prev_block_seqno = working_state.next_block_id_short.seqno - 1;
2125                                if let Some(sender) = &self.wu_tuner_event_sender {
2126                                    if let Err(err) = sender
2127                                        .send(work_units::WuEvent {
2128                                            shard: self.shard_id,
2129                                            seqno: prev_block_seqno,
2130                                            data: work_units::WuEventData::AnchorLag(lag),
2131                                        })
2132                                        .await
2133                                    {
2134                                        tracing::warn!(target: tracing_targets::COLLATOR,
2135                                            ?err,
2136                                            "error sending anchor lag to the tuner service",
2137                                        );
2138                                    }
2139                                } else {
2140                                    work_units::report_anchor_lag_to_metrics(
2141                                        &self.shard_id,
2142                                        lag.lag(),
2143                                    );
2144                                }
2145
2146                                imported_anchors_has_externals |= has_our_externals;
2147
2148                                // reduce used wu by one imported anchor
2149                                working_state.wu_used_from_last_anchor = working_state
2150                                    .wu_used_from_last_anchor
2151                                    .saturating_sub(wu_used_to_import_next_anchor);
2152
2153                                tracing::debug!(target: tracing_targets::COLLATOR,
2154                                    wu_used_from_last_anchor = working_state.wu_used_from_last_anchor,
2155                                    wu_used_to_import_next_anchor,
2156                                    "used wu dropped to",
2157                                );
2158
2159                                if working_state.wu_used_from_last_anchor
2160                                    < wu_used_to_import_next_anchor
2161                                {
2162                                    break;
2163                                }
2164
2165                                // even if accumulated wu still allow to import more anchors,
2166                                // we stop when mc block max interval thresold reached
2167
2168                                // NOTE: if collator is ahead of mempool, and wu was calculated inaccurately,
2169                                //      so that too much wu accumulated, we will not wait for many anchors,
2170                                //      we will produce mc block each max interval
2171                                if anchor.chain_time >= mc_block_max_interval_threshold {
2172                                    tracing::debug!(target: tracing_targets::COLLATOR,
2173                                        anchor_chain_time = anchor.chain_time,
2174                                        mc_block_max_interval_threshold,
2175                                        "stop importing anchors when mc block max interval threshold reached",
2176                                    );
2177                                    break;
2178                                }
2179                            }
2180                        },
2181                        ImportNextAnchor::Skipped => {
2182                            anchor_import_skipped = true;
2183                            break;
2184                        }
2185                    }
2186                }
2187
2188                metrics::gauge!("tycho_collator_import_next_anchor_count")
2189                    .set(imported_anchors_count);
2190
2191                if imported_anchors_has_externals {
2192                    tracing::debug!(target: tracing_targets::COLLATOR,
2193                        "just imported anchors has externals",
2194                    );
2195                }
2196
2197                let has_externals = has_externals || imported_anchors_has_externals;
2198
2199                match (has_uprocessed_messages, has_externals) {
2200                    // force master collation regardless of pending messages
2201                    _ if force_mc_block_by_uncommitted_chain => {
2202                        TryCollateCheck::ForceMcBlockByUncommittedChainLength
2203                    }
2204                    (true, _) => TryCollateCheck::HasUnprocessedMessages,
2205                    (false, true) => TryCollateCheck::HasExternals,
2206                    (false, false) => TryCollateCheck::NoPendingMessages,
2207                }
2208            }
2209        };
2210
2211        let (last_imported_anchor_id, last_imported_chain_time) = self
2212            .anchors_cache
2213            .get_last_imported_anchor_id_and_ct()
2214            .unwrap();
2215        let prev_block_chain_time = working_state.prev_shard_data_ref().gen_chain_time();
2216
2217        // when no messages for processing
2218        // then force empty shard block collation if related timeout elapsed
2219        if matches!(try_collate_check, TryCollateCheck::NoPendingMessages) {
2220            let empty_sc_block_interval_ms =
2221                working_state.collation_config.empty_sc_block_interval_ms as u64;
2222            let ct_elapsed_from_prev_block =
2223                last_imported_chain_time.saturating_sub(prev_block_chain_time);
2224            if empty_sc_block_interval_ms > 0
2225                && ct_elapsed_from_prev_block >= empty_sc_block_interval_ms
2226            {
2227                try_collate_check = TryCollateCheck::ForceEmptyShardBlock;
2228            }
2229        }
2230
2231        // collate next shard block or skip
2232        match try_collate_check {
2233            TryCollateCheck::HasUnprocessedMessages
2234            | TryCollateCheck::HasExternals
2235            | TryCollateCheck::ForceEmptyShardBlock => {
2236                tracing::debug!(target: tracing_targets::COLLATOR,
2237                    reason = ?try_collate_check,
2238                    anchor_import_skipped,
2239                    top_processed_to_anchor,
2240                    last_imported_anchor_id,
2241                    last_imported_chain_time,
2242                    prev_block_chain_time,
2243                    "will collate next shard block",
2244                );
2245
2246                // should force next master block collation after this shard block
2247                // when anchor import was skipped
2248                let force_next_mc_block = if anchor_import_skipped && uncommitted_chain_length > 4 {
2249                    ForceMasterCollation::ByAnchorImportSkipped
2250                } else {
2251                    ForceMasterCollation::No
2252                };
2253
2254                drop(histogram);
2255
2256                Box::pin(self.do_collate(
2257                    working_state,
2258                    None,
2259                    last_imported_chain_time,
2260                    force_next_mc_block,
2261                ))
2262                .await?;
2263            }
2264            TryCollateCheck::NoPendingMessages
2265            | TryCollateCheck::ForceMcBlockByUncommittedChainLength => {
2266                tracing::debug!(target: tracing_targets::COLLATOR,
2267                    reason = ?try_collate_check,
2268                    anchor_import_skipped,
2269                    force_mc_block_by_uncommitted_chain,
2270                    top_processed_to_anchor,
2271                    last_imported_anchor_id,
2272                    last_imported_chain_time,
2273                    prev_block_chain_time,
2274                    uncommitted_chain_length,
2275                    max_uncommitted_chain_length,
2276                    "will NOT collate next shard block, will notify collation manager",
2277                );
2278
2279                // we should force master block collation when anchor import was skipped
2280                // because without importing of next anchor chain time will not update
2281                // and master block interval will never be elapsed
2282                let force_mc_block = match try_collate_check {
2283                    TryCollateCheck::ForceMcBlockByUncommittedChainLength => {
2284                        ForceMasterCollation::ByUncommittedChain
2285                    }
2286                    TryCollateCheck::NoPendingMessages if anchor_import_skipped => {
2287                        ForceMasterCollation::ByAnchorImportSkipped
2288                    }
2289                    TryCollateCheck::NoPendingMessages if uncommitted_chain_length >= 1 => {
2290                        ForceMasterCollation::NoPendingMessagesAfterShardBlocks
2291                    }
2292                    _ => ForceMasterCollation::No,
2293                };
2294
2295                self.listener
2296                    .on_skipped(
2297                        working_state.mc_data.block_id,
2298                        working_state.next_block_id_short,
2299                        last_imported_chain_time,
2300                        force_mc_block,
2301                        working_state.collation_config.clone(),
2302                    )
2303                    .await?;
2304
2305                self.delayed_working_state.delay(working_state);
2306            }
2307        }
2308
2309        Ok(())
2310    }
2311}
2312
2313struct DelayedWorkingState {
2314    shard_id: ShardIdent,
2315    future: Option<DelayedWorkingStateFut>,
2316    unused: Option<Box<WorkingState>>,
2317}
2318
2319type DelayedWorkingStateFut =
2320    Pin<Box<dyn Future<Output = Result<Box<WorkingState>>> + Send + Sync + 'static>>;
2321
2322impl DelayedWorkingState {
2323    fn new<F>(shard_id: ShardIdent, f: F) -> Self
2324    where
2325        F: Future<Output = Result<Box<WorkingState>>> + Send + Sync + 'static,
2326    {
2327        Self {
2328            shard_id,
2329            future: Some(Box::pin(f)),
2330            unused: None,
2331        }
2332    }
2333
2334    async fn wait(&mut self) -> Result<Box<WorkingState>> {
2335        let labels = [("workchain", self.shard_id.workchain().to_string())];
2336        let _histogram = HistogramGuardWithLabels::begin(
2337            "tycho_collator_wait_for_working_state_time_high",
2338            &labels,
2339        );
2340
2341        if let Some(state) = self.unused.take() {
2342            return Ok(state);
2343        }
2344
2345        if let Some(fut) = self.future.take() {
2346            return fut.await;
2347        }
2348
2349        anyhow::bail!("No pending working state found");
2350    }
2351
2352    fn delay(&mut self, state: Box<WorkingState>) {
2353        self.unused = Some(state);
2354    }
2355
2356    fn reset(&mut self) {
2357        self.unused = None;
2358        self.future = None;
2359    }
2360}
2361
2362struct HandleGenesisResult {
2363    anchors_proc_info_opt: Option<AnchorsProcessingInfo>,
2364    genesis_info: GenesisInfo,
2365    /// Indicates if genesis was updated from the previous mc block collation
2366    genesis_updated: bool,
2367}
2368
2369struct AnchorsProcessingInfo {
2370    /// Last processed anchor for shard.
2371    /// When previous master did not include any shard block
2372    /// (there were no messages to process in the shard),
2373    /// will contain a `processed_to` info from master.
2374    pub processed_to_anchor_id: MempoolAnchorId,
2375    /// Messages offset in the last processed anchor.
2376    /// When previous master did not include any shard block
2377    /// (there were no messages to process in the shard),
2378    /// will contain a `processed_to` info from master.
2379    pub processed_to_msgs_offset: u64,
2380    /// Chain time of last imported anchor was used to collated block
2381    /// and was stored in the state after the block collation
2382    pub last_imported_chain_time: u64,
2383    /// Last imported chain time was used to collate this block
2384    pub last_imported_in_block_id: BlockId,
2385    /// The chain time of last imported anchor in current shard.
2386    /// For master block will always be equal to `last_imported_chain_time`.
2387    /// For shard block may be below `last_imported_chain_time` if there
2388    /// were no shard blocks between last and previous master (shard was not
2389    /// updated in last master block)
2390    pub current_shard_last_imported_chain_time: u64,
2391}
2392impl std::fmt::Debug for AnchorsProcessingInfo {
2393    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2394        std::fmt::Display::fmt(&self, f)
2395    }
2396}
2397impl std::fmt::Display for AnchorsProcessingInfo {
2398    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2399        f.debug_struct("AnchorsProcessingInfo")
2400            .field("processed_to_anchor_id", &self.processed_to_anchor_id)
2401            .field("processed_to_msgs_offset", &self.processed_to_msgs_offset)
2402            .field("last_imported_chain_time", &self.last_imported_chain_time)
2403            .field(
2404                "last_imported_in_block_id",
2405                &DebugDisplay(&self.last_imported_in_block_id),
2406            )
2407            .field(
2408                "current_shard_last_imported_chain_time",
2409                &self.current_shard_last_imported_chain_time,
2410            )
2411            .finish()
2412    }
2413}
2414
2415enum ImportNextAnchor {
2416    Result {
2417        prev_anchor_id: MempoolAnchorId,
2418        get_anchor_result: GetAnchorResult,
2419        has_our_externals: bool,
2420        requested_at: u64,
2421    },
2422    Skipped,
2423}
2424
2425#[derive(Debug)]
2426#[allow(dead_code)]
2427enum InitAnchorSource {
2428    FromCache(AnchorInfo),
2429    Imported(AnchorInfo),
2430}
2431
2432#[derive(Default)]
2433struct ImportInitAnchorsResult {
2434    anchors_info: Vec<InitAnchorSource>,
2435    /// Number of anchors that were imported
2436    /// after "last imported in current shard"
2437    /// up to "top last imported anchor"
2438    anchors_count_above_last_imported_in_current_shard: usize,
2439}
2440
2441/// Calculates maximum allowed lag in rounds between last imported anchor
2442/// and last processed. We won't import next anchors until we process more
2443/// previously imported anchors.
2444fn max_anchors_processing_lag_rounds<T: Into<u32>>(max_consensus_lag_rounds: T) -> u32 {
2445    max_consensus_lag_rounds
2446        .into()
2447        .saturating_mul(2)
2448        .saturating_div(3)
2449}
2450
2451#[derive(PartialEq, Eq)]
2452enum NextCollationFlowStep {
2453    Continue,
2454    Cancel,
2455}
2456
2457impl NextCollationFlowStep {
2458    pub fn should_cancel(&self) -> bool {
2459        *self == Self::Cancel
2460    }
2461}