1use std::pin::Pin;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anchors_cache::{AnchorInfo, AnchorsCache};
6use anyhow::{Context, Result};
7use async_trait::async_trait;
8use error::CollatorError;
9use futures_util::future::Future;
10use messages_reader::{MessagesReader, MessagesReaderContext};
11use tokio::sync::{Notify, oneshot};
12use tokio_util::sync::CancellationToken;
13use tracing::Instrument;
14use tycho_block_util::block::calc_next_block_id_short;
15use tycho_block_util::state::{ShardStateStuff, choose_genesis_info};
16use tycho_core::global_config::{MempoolGlobalConfig, ZerostateId};
17use tycho_core::storage::LoadStateHint;
18use tycho_network::PeerId;
19use tycho_types::models::*;
20use tycho_types::prelude::*;
21use tycho_util::futures::JoinTask;
22use tycho_util::mem::Reclaimer;
23use tycho_util::metrics::{HistogramGuard, HistogramGuardWithLabels};
24use tycho_util::time::now_millis;
25use types::MsgsExecutionParamsStuff;
26
27use self::types::{BlockSerializerCache, CollatorStats, PrevData, WorkingState};
28use crate::internal_queue::types::message::EnqueuedMessage;
29use crate::mempool::{GetAnchorResult, MempoolAdapter, MempoolAnchorId};
30use crate::queue_adapter::MessageQueueAdapter;
31use crate::state_node::StateNodeAdapter;
32use crate::types::processed_upto::ProcessedUptoInfoExtension;
33use crate::types::{
34 BlockCollationResult, CollationSessionId, CollationSessionInfo, CollatorConfig, DebugDisplay,
35 DisplayBlockIdsIntoIter, McData, TopBlockDescription,
36};
37use crate::utils::async_queued_dispatcher::{
38 AsyncQueuedDispatcher, STANDARD_QUEUED_DISPATCHER_BUFFER_SIZE,
39};
40use crate::{method_to_queued_async_closure, tracing_targets};
41
42mod anchors_cache;
43mod debug_info;
44mod do_collate;
45mod error;
46mod execution_manager;
47mod messages_buffer;
48mod messages_reader;
49mod types;
50
51#[cfg(any(test, feature = "bench-helpers"))]
52mod test_utils;
53
54#[cfg(feature = "bench-helpers")]
55pub mod bench_export {
56 pub use super::messages_buffer::{IncludeAllMessages, MessageGroup, MessagesBuffer};
57 pub use super::test_utils::make_stub_internal_parsed_message;
58 pub use super::types::ParsedMessage;
59}
60
61pub use do_collate::{is_first_block_after_prev_master, work_units};
62pub use error::CollationCancelReason;
63use messages_reader::state::ReaderState;
64pub use types::{ForceMasterCollation, ShardDescriptionExt};
65
66mod state;
67mod statistics;
68#[cfg(test)]
69#[path = "tests/collator_tests.rs"]
70pub(super) mod tests;
71
72#[cfg(test)]
73pub(crate) use messages_reader::tests::{TestInternalMessage, TestMessageFactory};
74
75use crate::collator::anchors_cache::AnchorsCacheTransaction;
76pub struct CollatorContext {
79 pub mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
80 pub mpool_adapter: Arc<dyn MempoolAdapter>,
81 pub state_node_adapter: Arc<dyn StateNodeAdapter>,
82 pub config: Arc<CollatorConfig>,
83 pub collation_session: Arc<CollationSessionInfo>,
84 pub listener: Arc<dyn CollatorEventListener>,
85 pub shard_id: ShardIdent,
86 pub prev_blocks_ids: Vec<BlockId>,
87 pub mc_data: Arc<McData>,
88 pub mempool_config_override: Option<MempoolGlobalConfig>,
89
90 pub cancel_collation: Arc<Notify>,
92 pub zerostate_id: ZerostateId,
93}
94
95#[async_trait]
96pub trait CollatorFactory: Send + Sync + 'static {
97 type Collator: Collator;
98
99 async fn start(&self, cx: CollatorContext) -> Result<Self::Collator>;
100}
101
102#[async_trait]
103impl<F, FT, R> CollatorFactory for F
104where
105 F: Fn(CollatorContext) -> FT + Send + Sync + 'static,
106 FT: Future<Output = Result<R>> + Send + 'static,
107 R: Collator,
108{
109 type Collator = R;
110
111 async fn start(&self, cx: CollatorContext) -> Result<Self::Collator> {
112 self(cx).await
113 }
114}
115
116#[async_trait]
119pub trait CollatorEventListener: Send + Sync {
120 async fn on_skipped(
122 &self,
123 prev_mc_block_id: BlockId,
124 next_block_id_short: BlockIdShort,
125 anchor_chain_time: u64,
126 force_mc_block: ForceMasterCollation,
127 collation_config: Arc<CollationConfig>,
128 ) -> Result<()>;
129 async fn on_cancelled(
131 &self,
132 prev_mc_block_id: BlockId,
133 next_block_id_short: BlockIdShort,
134 cancel_reason: CollationCancelReason,
135 ) -> Result<()>;
136 async fn on_block_candidate(&self, collation_result: BlockCollationResult) -> Result<()>;
138 async fn on_collator_stopped(&self, collation_session_id: CollationSessionId) -> Result<()>;
140}
141
142#[async_trait]
145pub trait Collator: Send + Sync + 'static {
146 async fn enqueue_stop(&self) -> Result<()>;
148 async fn enqueue_resume_collation(
150 &self,
151 mc_data: Arc<McData>,
152 reset: bool,
153 collation_session: Arc<CollationSessionInfo>,
154 prev_blocks_ids: Vec<BlockId>,
155 ) -> Result<()>;
156 async fn enqueue_try_collate(&self) -> Result<()>;
160 async fn enqueue_do_collate(
162 &self,
163 top_shard_blocks_info: Vec<TopBlockDescription>,
164 next_chain_time: u64,
165 ) -> Result<()>;
166}
167
168pub struct CollatorStdImplFactory {
169 pub wu_tuner_event_sender: Option<tokio::sync::mpsc::Sender<work_units::WuEvent>>,
170}
171
172#[async_trait]
173impl CollatorFactory for CollatorStdImplFactory {
174 type Collator = AsyncQueuedDispatcher<CollatorStdImpl>;
175
176 async fn start(&self, cx: CollatorContext) -> Result<Self::Collator> {
177 CollatorStdImpl::start(
178 cx.mq_adapter,
179 cx.mpool_adapter,
180 cx.state_node_adapter,
181 cx.config,
182 cx.collation_session,
183 cx.listener,
184 cx.zerostate_id,
185 cx.shard_id,
186 cx.prev_blocks_ids,
187 cx.mc_data,
188 cx.mempool_config_override,
189 cx.cancel_collation,
190 self.wu_tuner_event_sender.clone(),
191 )
192 .await
193 }
194}
195
196#[async_trait]
197impl Collator for AsyncQueuedDispatcher<CollatorStdImpl> {
198 async fn enqueue_stop(&self) -> Result<()> {
199 let cancel_token = self.cancel_token().clone();
200 self.enqueue_task(method_to_queued_async_closure!(stop_collator, cancel_token))
201 .await
202 }
203
204 async fn enqueue_resume_collation(
206 &self,
207 mc_data: Arc<McData>,
208 reset: bool,
209 collation_session: Arc<CollationSessionInfo>,
210 prev_blocks_ids: Vec<BlockId>,
211 ) -> Result<()> {
212 self.enqueue_task(method_to_queued_async_closure!(
213 resume_collation_wrapper,
214 mc_data,
215 reset,
216 collation_session,
217 prev_blocks_ids
218 ))
219 .await
220 }
221
222 async fn enqueue_try_collate(&self) -> Result<()> {
223 self.enqueue_task(method_to_queued_async_closure!(
224 wait_state_and_try_collate_wrapper,
225 ))
226 .await
227 }
228
229 async fn enqueue_do_collate(
230 &self,
231 top_shard_blocks_info: Vec<TopBlockDescription>,
232 next_chain_time: u64,
233 ) -> Result<()> {
234 self.enqueue_task(method_to_queued_async_closure!(
235 wait_state_and_do_collate_wrapper,
236 top_shard_blocks_info,
237 next_chain_time
238 ))
239 .await
240 }
241}
242
243pub struct CollatorStdImpl {
244 next_block_info: BlockIdShort,
245
246 config: Arc<CollatorConfig>,
247 collation_session: Arc<CollationSessionInfo>,
248
249 listener: Arc<dyn CollatorEventListener>,
250 mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
251 mpool_adapter: Arc<dyn MempoolAdapter>,
252 state_node_adapter: Arc<dyn StateNodeAdapter>,
253 shard_id: ShardIdent,
254
255 delayed_working_state: DelayedWorkingState,
256
257 background_store_new_state_tx:
258 tokio::sync::mpsc::UnboundedSender<JoinTask<Result<ShardStateStuff>>>,
259
260 anchors_cache: AnchorsCache,
261 block_serializer_cache: BlockSerializerCache,
262 stats: CollatorStats,
263 timer: std::time::Instant,
264 anchor_timer: std::time::Instant,
265 shard_blocks_count_from_last_anchor: u16,
266
267 mempool_config_override: Option<MempoolGlobalConfig>,
271
272 cancel_collation: Arc<Notify>,
274
275 wu_tuner_event_sender: Option<tokio::sync::mpsc::Sender<work_units::WuEvent>>,
277 zerostate_id: ZerostateId,
278}
279
280impl CollatorStdImpl {
281 #[allow(clippy::too_many_arguments)]
282 pub async fn start(
283 mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
284 mpool_adapter: Arc<dyn MempoolAdapter>,
285 state_node_adapter: Arc<dyn StateNodeAdapter>,
286 config: Arc<CollatorConfig>,
287 collation_session: Arc<CollationSessionInfo>,
288 listener: Arc<dyn CollatorEventListener>,
289 zerostate_id: ZerostateId,
290 shard_id: ShardIdent,
291 prev_blocks_ids: Vec<BlockId>,
292 mc_data: Arc<McData>,
293 mempool_config_override: Option<MempoolGlobalConfig>,
294 cancel_collation: Arc<Notify>,
295 wu_tuner_event_sender: Option<tokio::sync::mpsc::Sender<work_units::WuEvent>>,
296 ) -> Result<AsyncQueuedDispatcher<Self>> {
297 const BLOCK_CELL_COUNT_BASELINE: usize = 100_000;
298
299 let next_block_info = calc_next_block_id_short(&prev_blocks_ids);
300
301 tracing::info!(target: tracing_targets::COLLATOR,
302 "(next_block_id={}): collator starting...", next_block_info,
303 );
304
305 let (working_state_tx, working_state_rx) = oneshot::channel::<Result<Box<WorkingState>>>();
306
307 let (background_store_new_state_tx, mut background_store_new_state_rx) =
308 tokio::sync::mpsc::unbounded_channel();
309
310 let processor = Self {
311 next_block_info,
312 config,
313 collation_session,
314 listener,
315 mq_adapter,
316 mpool_adapter,
317 state_node_adapter,
318 shard_id,
319 delayed_working_state: DelayedWorkingState::new(shard_id, async move {
320 match working_state_rx.await {
321 Ok(state) => state,
322 Err(_) => anyhow::bail!("collator init cancelled"),
323 }
324 }),
325 background_store_new_state_tx,
326 anchors_cache: Default::default(),
327 block_serializer_cache: BlockSerializerCache::with_capacity(BLOCK_CELL_COUNT_BASELINE),
328 stats: Default::default(),
329 timer: std::time::Instant::now(),
330 anchor_timer: std::time::Instant::now(),
331 shard_blocks_count_from_last_anchor: 0,
332 mempool_config_override,
333 cancel_collation,
334 wu_tuner_event_sender,
335 zerostate_id,
336 };
337
338 tokio::spawn(async move {
340 while let Some(task) = background_store_new_state_rx.recv().await {
341 if let Err(err) = task.await {
342 tracing::error!(target: tracing_targets::COLLATOR,
343 "Error when store new state: {:?}", err,
344 );
345 }
346 }
347 });
348
349 let dispatcher =
351 AsyncQueuedDispatcher::create(processor, STANDARD_QUEUED_DISPATCHER_BUFFER_SIZE);
352 tracing::trace!(target: tracing_targets::COLLATOR,
353 "(next_block_id={}): collator tasks queue dispatcher started", next_block_info,
354 );
355
356 dispatcher
359 .enqueue_task(method_to_queued_async_closure!(
360 init_collator_wrapper,
361 prev_blocks_ids,
362 mc_data,
363 working_state_tx
364 ))
365 .await
366 .context("task receiver had to be not closed or dropped here")?;
367 tracing::info!(target: tracing_targets::COLLATOR,
368 "(next_block_id={}): collator initialization task enqueued", next_block_info,
369 );
370
371 tracing::info!(target: tracing_targets::COLLATOR,
372 "(next_block_id={}): collator started", next_block_info,
373 );
374
375 Ok(dispatcher)
376 }
377
378 async fn stop_collator(&mut self, dispatcher_cancel_token: CancellationToken) -> Result<()> {
379 self.listener
380 .on_collator_stopped(self.collation_session.id())
381 .await?;
382 dispatcher_cancel_token.cancel();
383 Ok(())
384 }
385
386 async fn init_collator_wrapper(
387 &mut self,
388 prev_blocks_ids: Vec<BlockId>,
389 mc_data: Arc<McData>,
390 working_state_tx: oneshot::Sender<Result<Box<WorkingState>>>,
391 ) -> Result<()> {
392 Box::pin(self.init_collator(prev_blocks_ids, mc_data, working_state_tx))
393 .await
394 .with_context(|| format!("next_block_id: {}", self.next_block_info))
395 }
396
397 #[tracing::instrument(skip_all, fields(next_block_id = %self.next_block_info))]
399 async fn init_collator(
400 &mut self,
401 prev_blocks_ids: Vec<BlockId>,
402 mc_data: Arc<McData>,
403 working_state_tx: oneshot::Sender<Result<Box<WorkingState>>>,
404 ) -> Result<()> {
405 tracing::info!(target: tracing_targets::COLLATOR, "initializing...");
406
407 let mut working_state = Self::init_working_state(
409 &self.next_block_info,
410 self.state_node_adapter.clone(),
411 mc_data,
412 prev_blocks_ids,
413 )
414 .await?;
415
416 let HandleGenesisResult {
418 anchors_proc_info_opt,
419 genesis_info,
420 genesis_updated,
421 } = self.handle_mempool_genesis(&mut working_state).await?;
422
423 if self
425 .try_import_init_anchors(&mut working_state, anchors_proc_info_opt, &genesis_info)
426 .await?
427 .should_cancel()
428 {
429 return Ok(());
430 }
431
432 working_state_tx.send(Ok(working_state)).ok();
433
434 self.timer = std::time::Instant::now();
435
436 self.anchor_timer = std::time::Instant::now();
437
438 tracing::info!(target: tracing_targets::COLLATOR, "init finished");
439
440 tracing::info!(target: tracing_targets::COLLATOR, "trying to collate next block after init...");
442 Box::pin(self.wait_state_and_try_collate(genesis_updated)).await?;
443
444 Ok(())
445 }
446
447 async fn handle_mempool_genesis(
450 &mut self,
451 working_state: &mut WorkingState,
452 ) -> Result<HandleGenesisResult> {
453 let anchors_proc_info_opt = {
455 let prev_shard_data = working_state.prev_shard_data_ref();
456 let prev_block_id = prev_shard_data.blocks_ids()[0]; Self::get_anchors_processing_info(
458 &working_state.next_block_id_short.shard,
459 &working_state.mc_data,
460 &prev_block_id,
461 prev_shard_data.gen_chain_time(),
462 prev_shard_data
463 .processed_upto()
464 .get_min_externals_processed_to()?,
465 )
466 };
467
468 let (genesis_info, genesis_updated) = choose_genesis_info(
470 working_state.mc_data.consensus_info.genesis_info,
471 working_state
472 .mc_data
473 .prev_mc_data
474 .as_ref()
475 .map(|mcd| mcd.consensus_info.genesis_info),
476 self.mempool_config_override
477 .as_ref()
478 .map(|o| o.genesis_info),
479 );
480
481 tracing::debug!(target: tracing_targets::COLLATOR,
482 ?genesis_info,
483 genesis_updated,
484 "checked if genesis info was updated since the last mc block collation",
485 );
486
487 if !genesis_updated {
488 return Ok(HandleGenesisResult {
489 anchors_proc_info_opt,
490 genesis_info,
491 genesis_updated,
492 });
493 }
494
495 self.anchors_cache.clear();
497
498 if let Some(anchors_proc_info) = &anchors_proc_info_opt {
501 self.set_anchors_cache_last_imported_from_genesis(&genesis_info, anchors_proc_info);
502 }
503
504 for externals_range_state in working_state.reader_state.externals.ranges.values_mut() {
506 for partition in externals_range_state.by_partitions.values_mut() {
507 let buffer_to_drop = std::mem::take(&mut partition.buffer);
508 Reclaimer::instance().drop(buffer_to_drop);
509 }
510 }
511
512 tracing::info!(target: tracing_targets::COLLATOR,
513 ?anchors_proc_info_opt,
514 ?genesis_info,
515 genesis_updated,
516 "externals reset on genesis update finished",
517 );
518
519 Ok(HandleGenesisResult {
520 anchors_proc_info_opt,
521 genesis_info,
522 genesis_updated,
523 })
524 }
525
526 fn set_anchors_cache_last_imported_from_genesis(
527 &mut self,
528 genesis_info: &GenesisInfo,
529 anchors_proc_info: &AnchorsProcessingInfo,
530 ) {
531 self.anchors_cache.add_imported_anchor_info(AnchorInfo {
532 id: genesis_info.start_round,
533 ct: anchors_proc_info.last_imported_chain_time,
534 all_exts_count: 0,
535 our_exts_count: 0,
536 author: PeerId(HashBytes::ZERO.0),
541 });
542 }
543
544 async fn check_and_import_init_anchors(
545 &mut self,
546 anchors_proc_info: &AnchorsProcessingInfo,
547 genesis_info: &GenesisInfo,
548 ) -> Result<ImportInitAnchorsResult, CollatorError> {
549 tracing::debug!(target: tracing_targets::COLLATOR,
550 %anchors_proc_info,
551 ?genesis_info,
552 "will check and import init anchors",
553 );
554
555 let import_init_anchors = if genesis_info.start_round > 0 {
557 if anchors_proc_info.processed_to_anchor_id > genesis_info.start_round {
560 true
561 }
562 else if anchors_proc_info.processed_to_anchor_id == genesis_info.start_round {
566 false
567 }
568 else if self.shard_id.is_masterchain() {
571 return Err(CollatorError::Cancelled(
572 CollationCancelReason::AnchorNotFound(anchors_proc_info.processed_to_anchor_id),
573 ));
574 }
575 else {
579 false
580 }
581 } else {
582 true
584 };
585
586 if import_init_anchors {
587 tracing::info!(target: tracing_targets::COLLATOR,
588 "importing init anchors from processed to anchor ({}) with offset ({}) \
589 to chain_time {} (current_shard_last_imported_chain_time = {})",
590 anchors_proc_info.processed_to_anchor_id,
591 anchors_proc_info.processed_to_msgs_offset,
592 anchors_proc_info.last_imported_chain_time,
593 anchors_proc_info.current_shard_last_imported_chain_time,
594 );
595
596 Self::import_init_anchors(
597 anchors_proc_info.processed_to_anchor_id,
598 anchors_proc_info.processed_to_msgs_offset,
599 anchors_proc_info.last_imported_chain_time,
600 anchors_proc_info.current_shard_last_imported_chain_time,
601 self.shard_id,
602 &mut self.anchors_cache,
603 self.mpool_adapter.clone(),
604 )
605 .await
606 } else {
607 tracing::info!(target: tracing_targets::COLLATOR,
608 "will not import init anchors",
609 );
610
611 if self.anchors_cache.last_imported_anchor_info().is_none() {
614 self.set_anchors_cache_last_imported_from_genesis(genesis_info, anchors_proc_info);
615 }
616
617 Ok(Default::default())
618 }
619 }
620
621 async fn try_import_init_anchors(
624 &mut self,
625 working_state: &mut WorkingState,
626 anchors_proc_info_opt: Option<AnchorsProcessingInfo>,
627 genesis_info: &GenesisInfo,
628 ) -> Result<NextCollationFlowStep> {
629 let Some(anchors_proc_info) = anchors_proc_info_opt else {
630 return Ok(NextCollationFlowStep::Continue);
631 };
632
633 let timer = std::time::Instant::now();
634
635 let cancel_collation = self.cancel_collation.clone();
638
639 let import_fut = self.check_and_import_init_anchors(&anchors_proc_info, genesis_info);
641 let import_res = tokio::select! {
642 res = import_fut => res,
643 _ = cancel_collation.notified() => {
644 tracing::info!(target: tracing_targets::COLLATOR,
645 "collation was cancelled by manager",
646 );
647 let labels = [("workchain", self.shard_id.workchain().to_string())];
648 metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels[..]).increment(1);
649 Err(CollatorError::Cancelled(CollationCancelReason::ExternalCancel))
650 }
651 };
652
653 let ImportInitAnchorsResult {
654 anchors_info,
655 mut anchors_count_above_last_imported_in_current_shard,
656 } = match import_res {
657 Err(CollatorError::Cancelled(reason)) => {
658 self.listener
659 .on_cancelled(
660 working_state.mc_data.block_id,
661 working_state.next_block_id_short,
662 reason,
663 )
664 .await?;
665 return Ok(NextCollationFlowStep::Cancel);
666 }
667 res => res?,
668 };
669
670 if !anchors_info.is_empty() {
671 tracing::debug!(target: tracing_targets::COLLATOR,
672 elapsed = timer.elapsed().as_millis(),
673 new_next_block_id = %self.next_block_info,
674 "imported init anchors: {:?}",
675 anchors_info.as_slice(),
676 );
677
678 let wu_used = &mut working_state.wu_used_from_last_anchor;
682 let wu_step = working_state.collation_config.wu_used_to_import_next_anchor;
683 while anchors_count_above_last_imported_in_current_shard > 0 {
684 anchors_count_above_last_imported_in_current_shard -= 1;
685 if let Some(new_wu_used) = wu_used.checked_sub(wu_step) {
686 *wu_used = new_wu_used;
687 } else {
688 break;
689 }
690 }
691 }
692
693 Ok(NextCollationFlowStep::Continue)
694 }
695
696 async fn resume_collation_wrapper(
697 &mut self,
698 mc_data: Arc<McData>,
699 reset: bool,
700 collation_session: Arc<CollationSessionInfo>,
701 new_prev_blocks_ids: Vec<BlockId>,
702 ) -> Result<()> {
703 Box::pin(self.resume_collation(mc_data, reset, collation_session, new_prev_blocks_ids))
704 .await
705 .with_context(|| format!("next_block_id: {}", self.next_block_info))
706 }
707
708 #[tracing::instrument(skip_all, fields(next_block_id = %self.next_block_info))]
709 async fn resume_collation(
710 &mut self,
711 mc_data: Arc<McData>,
712 reset: bool,
713 collation_session: Arc<CollationSessionInfo>,
714 new_prev_blocks_ids: Vec<BlockId>,
715 ) -> Result<()> {
716 let labels = [("workchain", self.shard_id.workchain().to_string())];
717 let histogram =
718 HistogramGuard::begin_with_labels("tycho_collator_resume_collation_time_high", &labels);
719
720 self.collation_session = collation_session;
722
723 let genesis_was_updated;
725
726 let mut working_state = if !reset {
727 let mut working_state = self.delayed_working_state.wait().await?;
728
729 tracing::info!(target: tracing_targets::COLLATOR,
730 mc_data_block_id = %working_state.mc_data.block_id.as_short_id(),
731 "resume collation without reset",
732 );
733
734 let prev_mc_seqno = working_state.mc_data.block_id.seqno;
736 if prev_mc_seqno < mc_data.block_id.seqno {
737 working_state.collation_config = Arc::new(mc_data.config.get_collation_config()?);
738 working_state.mc_data = mc_data;
739
740 if working_state.has_unprocessed_messages == Some(false) {
741 working_state.has_unprocessed_messages = None;
742 }
743
744 if !self.shard_id.is_masterchain() {
747 Self::reload_prev_data(
748 prev_mc_seqno,
749 &mut working_state,
750 self.state_node_adapter.clone(),
751 LoadStateHint {
752 allow_ignore_direct: false,
753 },
754 )
755 .await?;
756 }
757 }
758
759 let HandleGenesisResult {
761 genesis_updated, ..
762 } = self.handle_mempool_genesis(&mut working_state).await?;
763 genesis_was_updated = genesis_updated;
764
765 working_state
766 } else {
767 self.delayed_working_state.reset();
769
770 self.next_block_info = calc_next_block_id_short(&new_prev_blocks_ids);
771
772 tracing::info!(target: tracing_targets::COLLATOR,
773 mc_data_block_id = %mc_data.block_id.as_short_id(),
774 new_prev_blocks_ids = %DisplayBlockIdsIntoIter(&new_prev_blocks_ids),
775 new_next_block_id = %self.next_block_info,
776 "resume collation with reset",
777 );
778
779 tracing::debug!(target: tracing_targets::COLLATOR,
781 new_next_block_id = %self.next_block_info,
782 "reset working state and msgs buffer",
783 );
784 let mut working_state = Self::init_working_state(
785 &self.next_block_info,
786 self.state_node_adapter.clone(),
787 mc_data,
788 new_prev_blocks_ids,
789 )
790 .await?;
791
792 let HandleGenesisResult {
794 anchors_proc_info_opt,
795 genesis_info,
796 genesis_updated,
797 } = self.handle_mempool_genesis(&mut working_state).await?;
798 genesis_was_updated = genesis_updated;
799
800 if self
802 .try_import_init_anchors(&mut working_state, anchors_proc_info_opt, &genesis_info)
803 .await?
804 .should_cancel()
805 {
806 return Ok(());
807 }
808
809 working_state
810 };
811
812 working_state.resume_collation_elapsed = histogram.finish();
814
815 if self.shard_id.is_masterchain() {
816 Box::pin(self.try_collate_next_master_block_impl(working_state, genesis_was_updated))
817 .await
818 } else {
819 Box::pin(self.try_collate_next_shard_block_impl(working_state, genesis_was_updated))
820 .await
821 }
822 }
823
824 #[tracing::instrument(skip_all, fields(next_block_id = %next_block_id_short))]
825 async fn init_working_state(
826 next_block_id_short: &BlockIdShort,
827 state_node_adapter: Arc<dyn StateNodeAdapter>,
828 mc_data: Arc<McData>,
829 prev_blocks_ids: Vec<BlockId>,
830 ) -> Result<Box<WorkingState>> {
831 tracing::debug!(target: tracing_targets::COLLATOR,
833 prev_blocks_ids = %DisplayBlockIdsIntoIter(&prev_blocks_ids),
834 "loading prev states and queue diffs...",
835 );
836 let (prev_states, prev_queue_diff_hashes) = Self::load_states_and_diffs(
837 mc_data.block_id.seqno,
838 state_node_adapter,
839 prev_blocks_ids,
840 )
841 .await?;
842
843 tracing::debug!(target: tracing_targets::COLLATOR, "building working state...");
845
846 Self::build_and_validate_init_working_state(mc_data, prev_states, prev_queue_diff_hashes)
847 }
848
849 async fn reload_prev_data(
850 prev_mc_seqno: u32,
851 working_state: &mut WorkingState,
852 state_node_adapter: Arc<dyn StateNodeAdapter>,
853 hint: LoadStateHint,
854 ) -> Result<()> {
855 working_state.usage_tree.take();
857
858 let prev_shard_data = working_state
860 .prev_shard_data
861 .as_ref()
862 .expect("should exist here");
863 let prev_queue_diff_hashes = prev_shard_data.prev_queue_diff_hashes().clone();
864 let prev_blocks_ids = prev_shard_data.blocks_ids();
865
866 tracing::debug!(target: tracing_targets::COLLATOR,
868 prev_blocks_ids = %DisplayBlockIdsIntoIter(prev_blocks_ids),
869 "loading prev states...",
870 );
871 let prev_states = Self::load_prev_states(
872 prev_mc_seqno,
873 state_node_adapter.as_ref(),
874 prev_blocks_ids,
875 hint,
876 )
877 .await?;
878
879 tracing::debug!(target: tracing_targets::COLLATOR, "updating prev data in working state from reloaded state root...");
881
882 let (prev_shard_data, usage_tree) = PrevData::build(prev_states, prev_queue_diff_hashes)?;
883
884 working_state.prev_shard_data = Some(prev_shard_data);
886 working_state.usage_tree = Some(usage_tree);
887
888 Ok(())
889 }
890
891 #[allow(clippy::too_many_arguments)]
893 fn prepare_working_state_update(
894 &mut self,
895 new_observable_state: ShardStateStuff,
896 store_new_state_task: JoinTask<Result<ShardStateStuff>>,
897 new_queue_diff_hash: HashBytes,
898 new_mc_data: Arc<McData>,
899 collation_config: Arc<CollationConfig>,
900 has_unprocessed_messages: bool,
901 reader_state: ReaderState,
902 resume_collation_elapsed: Duration,
903 ) -> Result<()> {
904 enum GetNewShardStateStuff {
905 ReloadFromStorage(JoinTask<Result<ShardStateStuff>>),
906 BuildFromNewObservable(ShardStateStuff),
907 }
908
909 let block_id = *new_observable_state.block_id();
910
911 let ref_mc_state_handle = new_observable_state.ref_mc_state_handle().clone();
912 let get_new_state_stuff = if block_id.is_masterchain() {
913 GetNewShardStateStuff::ReloadFromStorage(store_new_state_task)
914 } else {
915 self.background_store_new_state_tx
916 .send(store_new_state_task)
917 .ok();
918
919 GetNewShardStateStuff::BuildFromNewObservable(new_observable_state)
920 };
921
922 let labels = [("workchain", self.shard_id.workchain().to_string())];
923 self.delayed_working_state.future = Some(Box::pin(async move {
924 let _histogram = HistogramGuard::begin_with_labels(
925 "tycho_collator_build_new_state_time_high",
926 &labels,
927 );
928
929 let new_state_stuff = match get_new_state_stuff {
930 GetNewShardStateStuff::BuildFromNewObservable(state) => state,
931 GetNewShardStateStuff::ReloadFromStorage(store_new_state_task) => {
932 store_new_state_task.await?
933 }
934 };
935
936 drop(ref_mc_state_handle);
940
941 let prev_states = vec![new_state_stuff];
942 let prev_queue_diff_hashes = vec![new_queue_diff_hash];
943 let (prev_shard_data, usage_tree) =
944 PrevData::build(prev_states, prev_queue_diff_hashes)?;
945
946 let next_block_id_short = calc_next_block_id_short(prev_shard_data.blocks_ids());
947
948 Ok(Box::new(WorkingState {
949 next_block_id_short,
950 mc_data: new_mc_data,
951 collation_config,
952 wu_used_from_last_anchor: prev_shard_data.wu_used_from_last_anchor(),
953 resume_collation_elapsed,
954 prev_shard_data: Some(prev_shard_data),
955 usage_tree: Some(usage_tree),
956 has_unprocessed_messages: Some(has_unprocessed_messages),
957 reader_state,
958 }))
959 }));
960
961 Ok(())
962 }
963
964 async fn load_states_and_diffs(
966 ref_by_mc_seqno: u32,
967 state_node_adapter: Arc<dyn StateNodeAdapter>,
968 prev_blocks_ids: Vec<BlockId>,
969 ) -> Result<(Vec<ShardStateStuff>, Vec<HashBytes>)> {
970 let load_state_fut: JoinTask<Result<Vec<ShardStateStuff>>> = JoinTask::new({
972 let state_node_adapter = state_node_adapter.clone();
973 let prev_blocks_ids = prev_blocks_ids.clone();
974 let span = tracing::Span::current();
975 async move {
976 Self::load_prev_states(
977 ref_by_mc_seqno,
978 state_node_adapter.as_ref(),
979 &prev_blocks_ids,
980 Default::default(),
981 )
982 .await
983 }
984 .instrument(span)
985 });
986
987 let prev_hashes_fut: JoinTask<Result<Vec<HashBytes>>> = JoinTask::new({
988 let span = tracing::Span::current();
989 async move {
990 let mut prev_hashes = vec![];
991 for prev_block_id in prev_blocks_ids {
992 if let Some(diff) = state_node_adapter.load_diff(&prev_block_id).await? {
994 tracing::debug!(target: tracing_targets::COLLATOR,
995 "loaded queue diff for prev_block_id {}",
996 prev_block_id.as_short_id(),
997 );
998 prev_hashes.push(*diff.diff_hash());
999 }
1000 }
1001 Ok(prev_hashes)
1002 }
1003 .instrument(span)
1004 });
1005
1006 let (prev_states, prev_hash) =
1007 futures_util::future::join(load_state_fut, prev_hashes_fut).await;
1008
1009 Ok((prev_states?, prev_hash?))
1010 }
1011
1012 async fn load_prev_states(
1013 prev_mc_seqno: u32,
1014 state_node_adapter: &dyn StateNodeAdapter,
1015 prev_blocks_ids: &[BlockId],
1016 hint: LoadStateHint,
1017 ) -> Result<Vec<ShardStateStuff>> {
1018 let mut prev_states = vec![];
1019 for prev_block_id in prev_blocks_ids {
1020 let state = state_node_adapter
1021 .load_state(prev_mc_seqno, prev_block_id, hint)
1022 .await?;
1023 tracing::debug!(target: tracing_targets::COLLATOR,
1024 "loaded prev shard state for prev_block_id {}",
1025 prev_block_id.as_short_id(),
1026 );
1027 prev_states.push(state);
1028 }
1029 Ok(prev_states)
1030 }
1031
1032 fn build_and_validate_init_working_state(
1039 mc_data: Arc<McData>,
1040 prev_states: Vec<ShardStateStuff>,
1041 prev_queue_diff_hashes: Vec<HashBytes>,
1042 ) -> Result<Box<WorkingState>> {
1043 let (prev_shard_data, usage_tree) = PrevData::build(prev_states, prev_queue_diff_hashes)?;
1046
1047 let next_block_id_short = calc_next_block_id_short(prev_shard_data.blocks_ids());
1048
1049 let collation_config = Arc::new(mc_data.config.get_collation_config()?);
1050
1051 Ok(Box::new(WorkingState {
1052 next_block_id_short,
1053 mc_data,
1054 wu_used_from_last_anchor: prev_shard_data.wu_used_from_last_anchor(),
1055 resume_collation_elapsed: Duration::ZERO,
1056 reader_state: ReaderState::new(prev_shard_data.processed_upto()),
1057 prev_shard_data: Some(prev_shard_data),
1058 usage_tree: Some(usage_tree),
1059 has_unprocessed_messages: None,
1060 collation_config,
1061 }))
1062 }
1063
1064 fn get_anchors_processing_info(
1065 shard_id: &ShardIdent,
1066 mc_data: &McData,
1067 prev_block_id: &BlockId,
1068 prev_gen_chain_time: u64,
1069 prev_externals_processed_to: (MempoolAnchorId, u64),
1070 ) -> Option<AnchorsProcessingInfo> {
1071 let mut from_mc_info_opt = None;
1077 if !shard_id.is_masterchain() {
1078 let (mc_processed_to_anchor_id, mc_processed_to_msgs_offset) = mc_data
1079 .processed_upto
1080 .get_min_externals_processed_to()
1081 .unwrap_or_default();
1082 if mc_processed_to_anchor_id > 0 {
1084 for (top_shard_id, top_shard_descr) in mc_data.shards.iter() {
1093 if shard_id == top_shard_id {
1094 if prev_block_id.seqno == top_shard_descr.seqno
1095 && !top_shard_descr.top_sc_block_updated
1096 {
1097 from_mc_info_opt = Some(AnchorsProcessingInfo {
1098 processed_to_anchor_id: mc_processed_to_anchor_id,
1099 processed_to_msgs_offset: mc_processed_to_msgs_offset,
1100 last_imported_chain_time: mc_data.gen_chain_time,
1101 last_imported_in_block_id: mc_data.block_id,
1102 current_shard_last_imported_chain_time: prev_gen_chain_time,
1103 });
1104 }
1105 break;
1106 }
1107 }
1108 }
1109 }
1110
1111 let from_prev_info_opt = match prev_externals_processed_to {
1121 (processed_to_anchor_id, processed_to_msgs_offset) if processed_to_anchor_id > 0 => {
1123 Some(AnchorsProcessingInfo {
1124 processed_to_anchor_id,
1125 processed_to_msgs_offset,
1126 last_imported_chain_time: prev_gen_chain_time,
1127 last_imported_in_block_id: *prev_block_id,
1128 current_shard_last_imported_chain_time: prev_gen_chain_time,
1129 })
1130 }
1131 _ => None,
1132 };
1133
1134 match (from_mc_info_opt, from_prev_info_opt) {
1136 (Some(from_mc_info), Some(from_prev_info)) => {
1143 if from_mc_info.processed_to_anchor_id > from_prev_info.processed_to_anchor_id
1144 || (from_mc_info.processed_to_anchor_id
1145 == from_prev_info.processed_to_anchor_id
1146 && from_mc_info.processed_to_msgs_offset
1147 > from_prev_info.processed_to_msgs_offset)
1148 {
1149 Some(from_mc_info)
1150 } else {
1151 Some(from_prev_info)
1152 }
1153 }
1154 (from_mc_info_opt, None) => from_mc_info_opt,
1158 (None, from_prev_info_opt) => from_prev_info_opt,
1165 }
1166 }
1167
1168 async fn import_next_anchor(
1174 shard_id: ShardIdent,
1175 anchors_cache: &mut AnchorsCache,
1176 mpool_adapter: Arc<dyn MempoolAdapter>,
1177 top_processed_to_anchor: MempoolAnchorId,
1178 max_consensus_lag_rounds: u32,
1179 force_anchor_import: bool,
1180 ) -> Result<ImportNextAnchor> {
1181 let labels = [("workchain", shard_id.workchain().to_string())];
1182
1183 let _histogram =
1184 HistogramGuardWithLabels::begin("tycho_collator_import_next_anchor_time_high", &labels);
1185
1186 let timer = std::time::Instant::now();
1187
1188 let (prev_anchor_id, ct) = anchors_cache
1189 .get_last_imported_anchor_id_and_ct()
1190 .unwrap_or_default();
1191
1192 if !force_anchor_import
1195 && prev_anchor_id.saturating_sub(top_processed_to_anchor)
1196 > max_anchors_processing_lag_rounds(max_consensus_lag_rounds)
1197 {
1198 metrics::counter!("tycho_collator_anchor_import_skipped_count", &labels).increment(1);
1199 return Ok(ImportNextAnchor::Skipped);
1200 }
1201
1202 let requested_at = now_millis();
1203
1204 let get_anchor_result = mpool_adapter.get_next_anchor(prev_anchor_id).await?;
1205
1206 let has_our_externals = match &get_anchor_result {
1207 GetAnchorResult::Exist(next_anchor) => {
1208 let our_exts_count = next_anchor.count_externals_for(&shard_id, 0);
1209 anchors_cache.add(next_anchor.clone(), our_exts_count);
1210
1211 let has_externals = our_exts_count > 0;
1212
1213 metrics::counter!("tycho_collator_ext_msgs_imported_count", &labels)
1214 .increment(our_exts_count as _);
1215 metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
1216 .increment(our_exts_count as f64);
1217
1218 let chain_time_elapsed = next_anchor.chain_time.saturating_sub(ct);
1219
1220 tracing::info!(target: tracing_targets::COLLATOR,
1221 elapsed = timer.elapsed().as_millis(),
1222 chain_time_elapsed,
1223 "imported next anchor (id: {}, chain_time: {}, all_exts: {}, our_exts: {})",
1224 next_anchor.id,
1225 next_anchor.chain_time,
1226 next_anchor.externals.len(),
1227 our_exts_count,
1228 );
1229
1230 has_externals
1231 }
1232 GetAnchorResult::NotExist => false,
1233 };
1234
1235 Ok(ImportNextAnchor::Result {
1236 prev_anchor_id,
1237 get_anchor_result,
1238 has_our_externals,
1239 requested_at,
1240 })
1241 }
1242
1243 pub(self) async fn import_init_anchors(
1247 processed_to_anchor_id: MempoolAnchorId,
1248 processed_to_msgs_offset: u64,
1249 last_block_chain_time: u64,
1250 current_shard_last_imported_chain_time: u64,
1251 shard_id: ShardIdent,
1252 anchors_cache: &mut AnchorsCache,
1253 mpool_adapter: Arc<dyn MempoolAdapter>,
1254 ) -> Result<ImportInitAnchorsResult, CollatorError> {
1255 let labels = [("workchain", shard_id.workchain().to_string())];
1256
1257 let mut res = ImportInitAnchorsResult::default();
1258
1259 let mut last_anchor = None;
1260 let mut all_anchors_are_taken_from_cache = false;
1261 let mut processed_to_anchor_exists_in_cache = false;
1262
1263 anchors_cache.remove_last_imported_above(last_block_chain_time);
1265
1266 for anchor in anchors_cache.iter().map(|(_, ca)| &ca.anchor) {
1268 if anchor.id >= processed_to_anchor_id {
1269 if anchor.id == processed_to_anchor_id {
1271 processed_to_anchor_exists_in_cache = true;
1272
1273 if anchor.externals.len() == processed_to_msgs_offset as usize {
1275 continue;
1276 }
1277 }
1278
1279 if !processed_to_anchor_exists_in_cache {
1288 break;
1289 }
1290
1291 if anchor.chain_time == last_block_chain_time {
1294 all_anchors_are_taken_from_cache = true;
1295 }
1296
1297 let our_exts_count = anchor.count_externals_for(&shard_id, 0);
1299 res.anchors_info
1300 .push(InitAnchorSource::FromCache(AnchorInfo::from_anchor(
1301 anchor,
1302 our_exts_count,
1303 )));
1304
1305 last_anchor = Some(anchor.clone());
1306 }
1307 }
1308
1309 if all_anchors_are_taken_from_cache {
1311 let Some(InitAnchorSource::FromCache(_)) = res.anchors_info.last() else {
1312 return Err(CollatorError::Anyhow(anyhow::anyhow!(
1313 "`anchors_info` should contain almost one `FromCache` here"
1314 )));
1315 };
1316
1317 return Ok(res);
1318 }
1319
1320 if !processed_to_anchor_exists_in_cache {
1322 anchors_cache.clear();
1323 metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels).set(0);
1324 }
1325
1326 let mut prev_anchor_id;
1327 let mut last_imported_anchor_ct;
1328 let mut next_anchor = match last_anchor {
1329 Some(anchor) => {
1330 prev_anchor_id = anchor.id;
1331 last_imported_anchor_ct = anchor.chain_time;
1332 None
1333 }
1334 None => {
1335 let GetAnchorResult::Exist(anchor) = mpool_adapter
1336 .get_anchor_by_id(processed_to_anchor_id)
1337 .await?
1338 else {
1339 return Err(CollatorError::Cancelled(
1340 CollationCancelReason::AnchorNotFound(processed_to_anchor_id),
1341 ));
1342 };
1343 prev_anchor_id = anchor.id;
1344 last_imported_anchor_ct = anchor.chain_time;
1345 Some(anchor)
1346 }
1347 };
1348
1349 loop {
1350 if let Some(anchor) = next_anchor {
1352 let our_exts_count = anchor.count_externals_for(&shard_id, 0);
1353
1354 if anchor.id == processed_to_anchor_id
1356 && anchor.externals.len() == processed_to_msgs_offset as usize
1357 {
1358 anchors_cache
1360 .add_imported_anchor_info(AnchorInfo::from_anchor(&anchor, our_exts_count));
1361 } else {
1362 anchors_cache.add(anchor.clone(), our_exts_count);
1364
1365 metrics::counter!("tycho_collator_ext_msgs_imported_count", &labels)
1366 .increment(our_exts_count as u64);
1367 metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
1368 .increment(our_exts_count as f64);
1369 }
1370
1371 res.anchors_info
1373 .push(InitAnchorSource::Imported(AnchorInfo::from_anchor(
1374 &anchor,
1375 our_exts_count,
1376 )));
1377 }
1378
1379 if last_imported_anchor_ct > current_shard_last_imported_chain_time {
1381 res.anchors_count_above_last_imported_in_current_shard += 1;
1382 }
1383
1384 if last_imported_anchor_ct >= last_block_chain_time {
1386 break;
1387 }
1388
1389 let GetAnchorResult::Exist(anchor) =
1390 mpool_adapter.get_next_anchor(prev_anchor_id).await?
1391 else {
1392 return Err(CollatorError::Cancelled(
1393 CollationCancelReason::NextAnchorNotFound(prev_anchor_id),
1394 ));
1395 };
1396 prev_anchor_id = anchor.id;
1397 last_imported_anchor_ct = anchor.chain_time;
1398
1399 next_anchor = Some(anchor);
1400 }
1401
1402 Ok(res)
1403 }
1404
1405 fn check_has_unprocessed_messages(&mut self, working_state: &mut WorkingState) -> Result<bool> {
1406 if let Some(has_messages) = working_state.has_unprocessed_messages {
1407 return Ok(has_messages);
1408 }
1409
1410 if working_state
1413 .reader_state
1414 .check_has_non_zero_processed_offset()
1415 {
1416 working_state.has_unprocessed_messages = Some(true);
1417 return Ok(true);
1418 }
1419
1420 if working_state.reader_state.has_messages_in_buffers() {
1423 working_state.has_unprocessed_messages = Some(true);
1424 return Ok(true);
1425 }
1426
1427 let msgs_exec_params = MsgsExecutionParamsStuff::create(
1430 working_state
1431 .prev_shard_data_ref()
1432 .processed_upto()
1433 .msgs_exec_params
1434 .clone(),
1435 working_state.collation_config.msgs_exec_params.clone(),
1436 );
1437
1438 let mut temp_anchors_cache = AnchorsCache::default();
1440
1441 let mut tx = AnchorsCacheTransaction::new(&mut temp_anchors_cache);
1442
1443 let for_shard_id = working_state.next_block_id_short.shard;
1445 let block_seqno = working_state.next_block_id_short.seqno;
1446 let mc_state_gen_lt = working_state.mc_data.gen_lt;
1447 let prev_state_gen_lt = working_state.prev_shard_data_ref().gen_lt();
1448 let mc_top_shards_end_lts: Vec<_> = working_state
1449 .mc_data
1450 .shards
1451 .iter()
1452 .map(|(k, v)| (*k, v.end_lt))
1453 .collect();
1454 let is_first = is_first_block_after_prev_master(
1455 working_state.prev_shard_data_ref().blocks_ids()[0], &working_state.mc_data.shards,
1457 );
1458
1459 let mut messages_reader = MessagesReader::new(
1461 MessagesReaderContext {
1462 for_shard_id,
1463 block_seqno,
1464 next_chain_time: 0,
1465 msgs_exec_params,
1466 mc_state_gen_lt,
1467 prev_state_gen_lt,
1468 mc_top_shards_end_lts,
1469 cumulative_stats_calc_params: None,
1470 reader_state: &mut working_state.reader_state,
1472 anchors_cache: &mut tx,
1475 is_first_block_after_prev_master: is_first,
1476 part_stat_ranges: None,
1477 },
1478 self.mq_adapter.clone(),
1479 )?;
1480
1481 let has_pending_internals = messages_reader.check_has_pending_internals_in_iterators()?;
1483
1484 if working_state.next_block_id_short.is_masterchain() {
1487 messages_reader.drop_internals_next_range_readers();
1488 } else if !has_pending_internals {
1489 messages_reader.drop_internals_next_range_readers();
1494 }
1495
1496 let _ = messages_reader.finalize(
1498 0, &Default::default(),
1500 )?;
1501
1502 working_state.has_unprocessed_messages = Some(has_pending_internals);
1503
1504 Ok(has_pending_internals)
1505 }
1506
1507 async fn wait_state_and_try_collate_wrapper(&mut self) -> Result<()> {
1508 Box::pin(self.wait_state_and_try_collate(false))
1509 .await
1510 .with_context(|| format!("next_block_id: {}", self.next_block_info))
1511 }
1512
1513 async fn wait_state_and_try_collate(&mut self, force_one_anchor_import: bool) -> Result<()> {
1514 let working_state = self.delayed_working_state.wait().await?;
1515
1516 if self.shard_id.is_masterchain() {
1517 Box::pin(
1518 self.try_collate_next_master_block_impl(working_state, force_one_anchor_import),
1519 )
1520 .await
1521 } else {
1522 Box::pin(self.try_collate_next_shard_block_impl(working_state, force_one_anchor_import))
1523 .await
1524 }
1525 }
1526
1527 async fn wait_state_and_do_collate_wrapper(
1528 &mut self,
1529 top_shard_blocks_info: Vec<TopBlockDescription>,
1530 next_chain_time: u64,
1531 ) -> Result<()> {
1532 Box::pin(self.wait_state_and_do_collate(top_shard_blocks_info, next_chain_time))
1533 .await
1534 .with_context(|| format!("next_block_id: {}", self.next_block_info))
1535 }
1536
1537 async fn wait_state_and_do_collate(
1538 &mut self,
1539 top_shard_blocks_info: Vec<TopBlockDescription>,
1540 next_chain_time: u64,
1541 ) -> Result<()> {
1542 let mut working_state = self.delayed_working_state.wait().await?;
1543
1544 let (mut last_imported_anchor_id, mut last_imported_chain_time) = self
1547 .anchors_cache
1548 .get_last_imported_anchor_id_and_ct()
1549 .context("should contain at least one imported anchor info here")?;
1550 if last_imported_chain_time < next_chain_time {
1551 let labels = [("workchain", self.shard_id.workchain().to_string())];
1552 let top_processed_to_anchor = working_state.mc_data.top_processed_to_anchor;
1553 let max_consensus_lag_rounds = working_state
1554 .mc_data
1555 .config
1556 .get_consensus_config()?
1557 .max_consensus_lag_rounds
1558 .get() as u32;
1559 while last_imported_chain_time < next_chain_time {
1560 let collation_cancelled = self.cancel_collation.notified();
1563 let import_fut = Self::import_next_anchor(
1564 self.shard_id,
1565 &mut self.anchors_cache,
1566 self.mpool_adapter.clone(),
1567 top_processed_to_anchor,
1568 max_consensus_lag_rounds,
1569 false,
1570 );
1571
1572 let import_anchor_result = tokio::select! {
1573 res = import_fut => res?,
1574 _ = collation_cancelled => {
1575 tracing::info!(target: tracing_targets::COLLATOR,
1576 top_processed_to_anchor,
1577 last_imported_anchor_id,
1578 last_imported_chain_time,
1579 "collation was cancelled by manager on wait_state_and_do_collate",
1580 );
1581 metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels).increment(1);
1582 self.listener
1583 .on_cancelled(
1584 working_state.mc_data.block_id,
1585 working_state.next_block_id_short,
1586 CollationCancelReason::ExternalCancel,
1587 )
1588 .await?;
1589 self.delayed_working_state.delay(working_state);
1590 return Ok(());
1591 }
1592 };
1593
1594 match import_anchor_result {
1595 ImportNextAnchor::Skipped => {
1596 anyhow::bail!(
1597 "anchor import cannot be skipped here because anchor \
1598 with next_chain_time {} should exit",
1599 next_chain_time,
1600 )
1601 }
1602 ImportNextAnchor::Result {
1603 get_anchor_result,
1604 has_our_externals,
1605 ..
1606 } => match get_anchor_result {
1607 GetAnchorResult::NotExist => {
1608 anyhow::bail!(
1609 "anchor with next_chain_time {} should exit",
1610 next_chain_time
1611 )
1612 }
1613 GetAnchorResult::Exist(next_anchor) => {
1614 let elapsed_from_prev_anchor = self.anchor_timer.elapsed();
1616 self.anchor_timer = std::time::Instant::now();
1617 metrics::histogram!(
1618 "tycho_collator_from_prev_anchor_time_high",
1619 &labels
1620 )
1621 .record(elapsed_from_prev_anchor);
1622
1623 working_state.wu_used_from_last_anchor = 0;
1624
1625 let elapsed_between_anchors = next_anchor
1627 .chain_time
1628 .saturating_sub(last_imported_chain_time);
1629 metrics::histogram!(
1630 "tycho_collator_between_anchors_time_high",
1631 &labels,
1632 )
1633 .record(Duration::from_millis(elapsed_between_anchors));
1634
1635 last_imported_anchor_id = next_anchor.id;
1636 last_imported_chain_time = next_anchor.chain_time;
1637
1638 tracing::debug!(target: tracing_targets::COLLATOR,
1639 top_processed_to_anchor,
1640 last_imported_anchor_id,
1641 last_imported_chain_time,
1642 has_externals = has_our_externals,
1643 "imported next anchor to reach next_chain_time",
1644 );
1645 }
1646 },
1647 }
1648 }
1649 }
1650
1651 Box::pin(self.do_collate(
1652 working_state,
1653 Some(top_shard_blocks_info),
1654 next_chain_time,
1655 ForceMasterCollation::No,
1656 ))
1657 .await
1658 }
1659
1660 #[tracing::instrument(
1664 parent = None, name = "try_collate_next_master_block",
1665 skip_all, fields(
1666 next_block_id = %self.next_block_info,
1667 mc_data_block_id = %working_state.mc_data.block_id.as_short_id(),
1668 )
1669 )]
1670 async fn try_collate_next_master_block_impl(
1671 &mut self,
1672 mut working_state: Box<WorkingState>,
1673 mut force_one_anchor_import: bool,
1674 ) -> Result<()> {
1675 tracing::debug!(target: tracing_targets::COLLATOR,
1676 force_one_anchor_import,
1677 "Check if can collate next master block",
1678 );
1679
1680 let labels = [("workchain", self.shard_id.workchain().to_string())];
1681 let _histogram = HistogramGuardWithLabels::begin(
1682 "tycho_collator_try_collate_next_master_block_time",
1683 &labels,
1684 );
1685
1686 let top_processed_to_anchor = working_state.mc_data.top_processed_to_anchor;
1687
1688 let (mut last_imported_anchor_id, mut last_imported_chain_time) = self
1689 .anchors_cache
1690 .get_last_imported_anchor_id_and_ct()
1691 .unwrap_or_default();
1692
1693 let has_unprocessed_messages = self.check_has_unprocessed_messages(&mut working_state)?;
1695
1696 let force_mc_collation =
1698 has_unprocessed_messages.then_some(ForceMasterCollation::ByUnprocessedMessages);
1699
1700 match (has_unprocessed_messages, force_one_anchor_import) {
1701 (true, false) => {
1702 tracing::info!(target: tracing_targets::COLLATOR,
1704 top_processed_to_anchor,
1705 last_imported_anchor_id,
1706 last_imported_chain_time,
1707 "there are unprocessed messages from previous block, will collate next block",
1708 );
1709
1710 self.listener
1711 .on_skipped(
1712 working_state.mc_data.block_id,
1713 working_state.next_block_id_short,
1714 working_state.prev_shard_data_ref().gen_chain_time(),
1715 force_mc_collation.expect(
1716 "should be Some(..) here because of `has_unprocessed_messages = true`",
1717 ),
1718 working_state.collation_config.clone(),
1719 )
1720 .await?;
1721 self.delayed_working_state.delay(working_state);
1722 return Ok(());
1723 }
1724 (false, _) => {
1725 tracing::debug!(target: tracing_targets::COLLATOR,
1726 top_processed_to_anchor,
1727 last_imported_anchor_id,
1728 last_imported_chain_time,
1729 "there are no unprocessed messages, will import next anchor",
1730 );
1731 }
1732 (_, true) => {
1733 tracing::info!(target: tracing_targets::COLLATOR,
1734 force_one_anchor_import,
1735 "will import next anchor",
1736 );
1737 }
1738 }
1739
1740 let collation_cancelled = self.cancel_collation.notified();
1745 let import_fut = Self::import_next_anchor(
1746 self.shard_id,
1747 &mut self.anchors_cache,
1748 self.mpool_adapter.clone(),
1749 working_state.mc_data.top_processed_to_anchor,
1750 working_state
1751 .mc_data
1752 .config
1753 .get_consensus_config()?
1754 .max_consensus_lag_rounds
1755 .get() as u32,
1756 std::mem::take(&mut force_one_anchor_import),
1757 );
1758
1759 let import_anchor_result = tokio::select! {
1760 res = import_fut => res?,
1761 _ = collation_cancelled => {
1762 tracing::info!(target: tracing_targets::COLLATOR,
1763 top_processed_to_anchor,
1764 last_imported_anchor_id,
1765 last_imported_chain_time,
1766 "collation was cancelled by manager on try_collate_next_master_block",
1767 );
1768 metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels).increment(1);
1769 self.listener
1770 .on_cancelled(
1771 working_state.mc_data.block_id,
1772 working_state.next_block_id_short,
1773 CollationCancelReason::ExternalCancel,
1774 )
1775 .await?;
1776 self.delayed_working_state.delay(working_state);
1777 return Ok(());
1778 }
1779 };
1780
1781 match import_anchor_result {
1782 ImportNextAnchor::Result {
1783 prev_anchor_id,
1784 get_anchor_result,
1785 has_our_externals,
1786 ..
1787 } => match get_anchor_result {
1788 GetAnchorResult::NotExist => {
1789 tracing::warn!(target: tracing_targets::COLLATOR,
1790 top_processed_to_anchor,
1791 last_imported_anchor_id,
1792 last_imported_chain_time,
1793 "next anchor not exist, cancel collation attempts",
1794 );
1795 self.listener
1796 .on_cancelled(
1797 working_state.mc_data.block_id,
1798 working_state.next_block_id_short,
1799 CollationCancelReason::NextAnchorNotFound(prev_anchor_id),
1800 )
1801 .await?;
1802 self.delayed_working_state.delay(working_state);
1803 return Ok(());
1804 }
1805 GetAnchorResult::Exist(next_anchor) => {
1806 let elapsed_from_prev_anchor = self.anchor_timer.elapsed();
1808 self.anchor_timer = std::time::Instant::now();
1809 metrics::histogram!("tycho_collator_from_prev_anchor_time_high", &labels)
1810 .record(elapsed_from_prev_anchor);
1811
1812 working_state.wu_used_from_last_anchor = 0;
1813
1814 let elapsed_between_anchors = next_anchor.chain_time - last_imported_chain_time;
1816 metrics::histogram!("tycho_collator_between_anchors_time_high", &labels,)
1817 .record(Duration::from_millis(elapsed_between_anchors));
1818
1819 last_imported_anchor_id = next_anchor.id;
1820 last_imported_chain_time = next_anchor.chain_time;
1821
1822 tracing::debug!(target: tracing_targets::COLLATOR,
1823 force_mc_block = false,
1824 top_processed_to_anchor,
1825 last_imported_anchor_id,
1826 last_imported_chain_time,
1827 has_externals = has_our_externals,
1828 "imported next anchor, will notify collation manager",
1829 );
1830
1831 self.listener
1833 .on_skipped(
1834 working_state.mc_data.block_id,
1835 working_state.next_block_id_short,
1836 next_anchor.chain_time,
1837 force_mc_collation.unwrap_or(ForceMasterCollation::No),
1838 working_state.collation_config.clone(),
1839 )
1840 .await?;
1841
1842 self.delayed_working_state.delay(working_state);
1843 }
1844 },
1845 ImportNextAnchor::Skipped => {
1846 tracing::debug!(target: tracing_targets::COLLATOR,
1847 force_mc_block = true,
1848 top_processed_to_anchor,
1849 last_imported_anchor_id,
1850 last_imported_chain_time,
1851 "mempool paused, will notify collation manager",
1852 );
1853
1854 self.listener
1856 .on_skipped(
1857 working_state.mc_data.block_id,
1858 working_state.next_block_id_short,
1859 last_imported_chain_time,
1860 force_mc_collation.unwrap_or(ForceMasterCollation::ByAnchorImportSkipped),
1861 working_state.collation_config.clone(),
1862 )
1863 .await?;
1864
1865 self.delayed_working_state.delay(working_state);
1866 }
1867 }
1868
1869 Ok(())
1870 }
1871
1872 #[tracing::instrument(
1880 parent = None, name = "try_collate_next_shard_block",
1881 skip_all, fields(
1882 next_block_id = %self.next_block_info,
1883 mc_data_block_id = %working_state.mc_data.block_id.as_short_id(),
1884 )
1885 )]
1886 async fn try_collate_next_shard_block_impl(
1887 &mut self,
1888 mut working_state: Box<WorkingState>,
1889 mut force_one_anchor_import: bool,
1890 ) -> Result<()> {
1891 tracing::debug!(target: tracing_targets::COLLATOR,
1892 force_one_anchor_import,
1893 "Check if can collate next shard block",
1894 );
1895
1896 let labels = [("workchain", self.shard_id.workchain().to_string())];
1897 let histogram = HistogramGuardWithLabels::begin(
1898 "tycho_collator_try_collate_next_shard_block_time",
1899 &labels,
1900 );
1901
1902 let top_processed_to_anchor = working_state.mc_data.top_processed_to_anchor;
1903
1904 let mut last_committed_seqno = 0;
1906 for (shard_id, shard_descr) in working_state.mc_data.shards.iter() {
1907 if *shard_id == self.shard_id {
1908 last_committed_seqno = shard_descr.seqno;
1909 }
1910 }
1911 let uncommitted_chain_length =
1912 working_state.next_block_id_short.seqno - 1 - last_committed_seqno;
1913 let max_uncommitted_chain_length =
1914 working_state.collation_config.max_uncommitted_chain_length as u32;
1915
1916 let force_mc_block_by_uncommitted_chain =
1917 uncommitted_chain_length >= max_uncommitted_chain_length;
1918
1919 #[derive(Debug)]
1921 enum TryCollateCheck {
1922 ForceMcBlockByUncommittedChainLength,
1923 HasUnprocessedMessages,
1924 HasExternals,
1925 NoPendingMessages,
1926 ForceEmptyShardBlock,
1927 }
1928 let mut anchor_import_skipped = false;
1929 let mut try_collate_check = 'check: {
1930 {
1931 let has_uprocessed_messages =
1933 self.check_has_unprocessed_messages(&mut working_state)?;
1934
1935 let has_externals = self.anchors_cache.has_pending_externals();
1937
1938 let mc_block_max_interval_threshold = working_state.mc_data.gen_chain_time
1940 + working_state.collation_config.mc_block_max_interval_ms as u64;
1941
1942 let wu_used_from_last_anchor = working_state.wu_used_from_last_anchor;
1944 let wu_used_to_import_next_anchor =
1945 working_state.collation_config.wu_used_to_import_next_anchor;
1946 let should_import_anchor_by_used_wu =
1947 wu_used_from_last_anchor >= wu_used_to_import_next_anchor;
1948
1949 metrics::gauge!("tycho_collator_wu_used_from_last_anchor", &[(
1951 "type",
1952 "wu_one_anchor"
1953 ),])
1954 .set(wu_used_to_import_next_anchor as f64);
1955 metrics::gauge!("tycho_collator_wu_used_from_last_anchor", &[(
1956 "type",
1957 "wu_used_sum"
1958 ),])
1959 .set(wu_used_from_last_anchor as f64);
1960 let can_import_anchors_count =
1961 wu_used_from_last_anchor.saturating_div(wu_used_to_import_next_anchor);
1962 metrics::gauge!("tycho_collator_can_import_anchors_count")
1963 .set(can_import_anchors_count as f64);
1964
1965 let (mut last_imported_anchor_id, mut last_imported_chain_time) = self
1967 .anchors_cache
1968 .get_last_imported_anchor_id_and_ct()
1969 .unwrap_or_default();
1970 match (
1971 has_uprocessed_messages,
1972 has_externals,
1973 should_import_anchor_by_used_wu,
1974 force_one_anchor_import,
1975 ) {
1976 _ if force_mc_block_by_uncommitted_chain => {
1979 tracing::debug!(target: tracing_targets::COLLATOR,
1980 top_processed_to_anchor,
1981 last_imported_anchor_id,
1982 last_imported_chain_time,
1983 uncommitted_chain_length,
1984 max_uncommitted_chain_length,
1985 "uncommitted chain limit reached, will import next anchor and then force master collation",
1986 );
1987 }
1988 (true, _, false, false) => {
1989 break 'check TryCollateCheck::HasUnprocessedMessages;
1990 }
1991 (_, true, false, false) => break 'check TryCollateCheck::HasExternals,
1992 (false, false, false, false) => {
1993 tracing::debug!(target: tracing_targets::COLLATOR,
1994 top_processed_to_anchor,
1995 last_imported_anchor_id,
1996 last_imported_chain_time,
1997 "there are no pending internals or externals, will import next anchor",
1998 );
1999
2000 working_state.wu_used_from_last_anchor = 0;
2002 }
2003 (_, _, true, false) => {
2004 tracing::info!(target: tracing_targets::COLLATOR,
2005 top_processed_to_anchor,
2006 last_imported_anchor_id,
2007 last_imported_chain_time,
2008 "wu used from last anchor {} reached limit {} on length {}, will import next anchor",
2009 wu_used_from_last_anchor, wu_used_to_import_next_anchor, uncommitted_chain_length,
2010 );
2011 }
2012 (_, _, _, true) => {
2013 tracing::info!(target: tracing_targets::COLLATOR,
2014 force_one_anchor_import,
2015 "will import next anchor",
2016 );
2017 }
2018 }
2019
2020 let max_consensus_lag_rounds = working_state
2021 .mc_data
2022 .config
2023 .get_consensus_config()?
2024 .max_consensus_lag_rounds
2025 .get() as u32;
2026
2027 let mut imported_anchors_count = 0;
2028 let mut imported_anchors_has_externals = false;
2029
2030 loop {
2032 let collation_cancelled = self.cancel_collation.notified();
2035 let import_fut = Self::import_next_anchor(
2036 self.shard_id,
2037 &mut self.anchors_cache,
2038 self.mpool_adapter.clone(),
2039 working_state.mc_data.top_processed_to_anchor,
2040 max_consensus_lag_rounds,
2041 std::mem::take(&mut force_one_anchor_import),
2042 );
2043
2044 let import_anchor_result = tokio::select! {
2045 res = import_fut => res?,
2046 _ = collation_cancelled => {
2047 tracing::info!(target: tracing_targets::COLLATOR,
2048 top_processed_to_anchor,
2049 last_imported_anchor_id,
2050 last_imported_chain_time,
2051 "collation was cancelled by manager on try_collate_next_shard_block",
2052 );
2053 metrics::counter!("tycho_collator_anchor_import_cancelled_count", &labels).increment(1);
2054 self.listener
2055 .on_cancelled(
2056 working_state.mc_data.block_id,
2057 working_state.next_block_id_short,
2058 CollationCancelReason::ExternalCancel,
2059 )
2060 .await?;
2061 self.delayed_working_state.delay(working_state);
2062 return Ok(());
2063 }
2064 };
2065
2066 match import_anchor_result {
2067 ImportNextAnchor::Result {
2068 prev_anchor_id,
2069 get_anchor_result,
2070 has_our_externals,
2071 requested_at,
2072 } => match get_anchor_result {
2073 GetAnchorResult::NotExist => {
2074 tracing::warn!(target: tracing_targets::COLLATOR,
2076 top_processed_to_anchor,
2077 last_imported_anchor_id,
2078 last_imported_chain_time,
2079 "next anchor not exist, cancel collation attempts",
2080 );
2081 self.listener
2082 .on_cancelled(
2083 working_state.mc_data.block_id,
2084 working_state.next_block_id_short,
2085 CollationCancelReason::NextAnchorNotFound(prev_anchor_id),
2086 )
2087 .await?;
2088 self.delayed_working_state.delay(working_state);
2089 return Ok(());
2090 }
2091 GetAnchorResult::Exist(anchor) => {
2092 imported_anchors_count += 1;
2093
2094 let elapsed_from_prev_anchor = self.anchor_timer.elapsed();
2096 self.anchor_timer = std::time::Instant::now();
2097 metrics::histogram!(
2098 "tycho_collator_from_prev_anchor_time_high",
2099 &labels
2100 )
2101 .record(elapsed_from_prev_anchor);
2102
2103 let elapsed_between_anchors =
2105 anchor.chain_time - last_imported_chain_time;
2106 metrics::histogram!(
2107 "tycho_collator_between_anchors_time_high",
2108 &labels,
2109 )
2110 .record(Duration::from_millis(elapsed_between_anchors));
2111
2112 last_imported_anchor_id = anchor.id;
2113 last_imported_chain_time = anchor.chain_time;
2114
2115 metrics::gauge!("tycho_collator_shard_blocks_count_btw_anchors")
2116 .set(self.shard_blocks_count_from_last_anchor);
2117 self.shard_blocks_count_from_last_anchor = 0;
2118
2119 let lag = work_units::MempoolAnchorLag {
2121 requested_at,
2122 chain_time: anchor.chain_time,
2123 };
2124 let prev_block_seqno = working_state.next_block_id_short.seqno - 1;
2125 if let Some(sender) = &self.wu_tuner_event_sender {
2126 if let Err(err) = sender
2127 .send(work_units::WuEvent {
2128 shard: self.shard_id,
2129 seqno: prev_block_seqno,
2130 data: work_units::WuEventData::AnchorLag(lag),
2131 })
2132 .await
2133 {
2134 tracing::warn!(target: tracing_targets::COLLATOR,
2135 ?err,
2136 "error sending anchor lag to the tuner service",
2137 );
2138 }
2139 } else {
2140 work_units::report_anchor_lag_to_metrics(
2141 &self.shard_id,
2142 lag.lag(),
2143 );
2144 }
2145
2146 imported_anchors_has_externals |= has_our_externals;
2147
2148 working_state.wu_used_from_last_anchor = working_state
2150 .wu_used_from_last_anchor
2151 .saturating_sub(wu_used_to_import_next_anchor);
2152
2153 tracing::debug!(target: tracing_targets::COLLATOR,
2154 wu_used_from_last_anchor = working_state.wu_used_from_last_anchor,
2155 wu_used_to_import_next_anchor,
2156 "used wu dropped to",
2157 );
2158
2159 if working_state.wu_used_from_last_anchor
2160 < wu_used_to_import_next_anchor
2161 {
2162 break;
2163 }
2164
2165 if anchor.chain_time >= mc_block_max_interval_threshold {
2172 tracing::debug!(target: tracing_targets::COLLATOR,
2173 anchor_chain_time = anchor.chain_time,
2174 mc_block_max_interval_threshold,
2175 "stop importing anchors when mc block max interval threshold reached",
2176 );
2177 break;
2178 }
2179 }
2180 },
2181 ImportNextAnchor::Skipped => {
2182 anchor_import_skipped = true;
2183 break;
2184 }
2185 }
2186 }
2187
2188 metrics::gauge!("tycho_collator_import_next_anchor_count")
2189 .set(imported_anchors_count);
2190
2191 if imported_anchors_has_externals {
2192 tracing::debug!(target: tracing_targets::COLLATOR,
2193 "just imported anchors has externals",
2194 );
2195 }
2196
2197 let has_externals = has_externals || imported_anchors_has_externals;
2198
2199 match (has_uprocessed_messages, has_externals) {
2200 _ if force_mc_block_by_uncommitted_chain => {
2202 TryCollateCheck::ForceMcBlockByUncommittedChainLength
2203 }
2204 (true, _) => TryCollateCheck::HasUnprocessedMessages,
2205 (false, true) => TryCollateCheck::HasExternals,
2206 (false, false) => TryCollateCheck::NoPendingMessages,
2207 }
2208 }
2209 };
2210
2211 let (last_imported_anchor_id, last_imported_chain_time) = self
2212 .anchors_cache
2213 .get_last_imported_anchor_id_and_ct()
2214 .unwrap();
2215 let prev_block_chain_time = working_state.prev_shard_data_ref().gen_chain_time();
2216
2217 if matches!(try_collate_check, TryCollateCheck::NoPendingMessages) {
2220 let empty_sc_block_interval_ms =
2221 working_state.collation_config.empty_sc_block_interval_ms as u64;
2222 let ct_elapsed_from_prev_block =
2223 last_imported_chain_time.saturating_sub(prev_block_chain_time);
2224 if empty_sc_block_interval_ms > 0
2225 && ct_elapsed_from_prev_block >= empty_sc_block_interval_ms
2226 {
2227 try_collate_check = TryCollateCheck::ForceEmptyShardBlock;
2228 }
2229 }
2230
2231 match try_collate_check {
2233 TryCollateCheck::HasUnprocessedMessages
2234 | TryCollateCheck::HasExternals
2235 | TryCollateCheck::ForceEmptyShardBlock => {
2236 tracing::debug!(target: tracing_targets::COLLATOR,
2237 reason = ?try_collate_check,
2238 anchor_import_skipped,
2239 top_processed_to_anchor,
2240 last_imported_anchor_id,
2241 last_imported_chain_time,
2242 prev_block_chain_time,
2243 "will collate next shard block",
2244 );
2245
2246 let force_next_mc_block = if anchor_import_skipped && uncommitted_chain_length > 4 {
2249 ForceMasterCollation::ByAnchorImportSkipped
2250 } else {
2251 ForceMasterCollation::No
2252 };
2253
2254 drop(histogram);
2255
2256 Box::pin(self.do_collate(
2257 working_state,
2258 None,
2259 last_imported_chain_time,
2260 force_next_mc_block,
2261 ))
2262 .await?;
2263 }
2264 TryCollateCheck::NoPendingMessages
2265 | TryCollateCheck::ForceMcBlockByUncommittedChainLength => {
2266 tracing::debug!(target: tracing_targets::COLLATOR,
2267 reason = ?try_collate_check,
2268 anchor_import_skipped,
2269 force_mc_block_by_uncommitted_chain,
2270 top_processed_to_anchor,
2271 last_imported_anchor_id,
2272 last_imported_chain_time,
2273 prev_block_chain_time,
2274 uncommitted_chain_length,
2275 max_uncommitted_chain_length,
2276 "will NOT collate next shard block, will notify collation manager",
2277 );
2278
2279 let force_mc_block = match try_collate_check {
2283 TryCollateCheck::ForceMcBlockByUncommittedChainLength => {
2284 ForceMasterCollation::ByUncommittedChain
2285 }
2286 TryCollateCheck::NoPendingMessages if anchor_import_skipped => {
2287 ForceMasterCollation::ByAnchorImportSkipped
2288 }
2289 TryCollateCheck::NoPendingMessages if uncommitted_chain_length >= 1 => {
2290 ForceMasterCollation::NoPendingMessagesAfterShardBlocks
2291 }
2292 _ => ForceMasterCollation::No,
2293 };
2294
2295 self.listener
2296 .on_skipped(
2297 working_state.mc_data.block_id,
2298 working_state.next_block_id_short,
2299 last_imported_chain_time,
2300 force_mc_block,
2301 working_state.collation_config.clone(),
2302 )
2303 .await?;
2304
2305 self.delayed_working_state.delay(working_state);
2306 }
2307 }
2308
2309 Ok(())
2310 }
2311}
2312
2313struct DelayedWorkingState {
2314 shard_id: ShardIdent,
2315 future: Option<DelayedWorkingStateFut>,
2316 unused: Option<Box<WorkingState>>,
2317}
2318
2319type DelayedWorkingStateFut =
2320 Pin<Box<dyn Future<Output = Result<Box<WorkingState>>> + Send + Sync + 'static>>;
2321
2322impl DelayedWorkingState {
2323 fn new<F>(shard_id: ShardIdent, f: F) -> Self
2324 where
2325 F: Future<Output = Result<Box<WorkingState>>> + Send + Sync + 'static,
2326 {
2327 Self {
2328 shard_id,
2329 future: Some(Box::pin(f)),
2330 unused: None,
2331 }
2332 }
2333
2334 async fn wait(&mut self) -> Result<Box<WorkingState>> {
2335 let labels = [("workchain", self.shard_id.workchain().to_string())];
2336 let _histogram = HistogramGuardWithLabels::begin(
2337 "tycho_collator_wait_for_working_state_time_high",
2338 &labels,
2339 );
2340
2341 if let Some(state) = self.unused.take() {
2342 return Ok(state);
2343 }
2344
2345 if let Some(fut) = self.future.take() {
2346 return fut.await;
2347 }
2348
2349 anyhow::bail!("No pending working state found");
2350 }
2351
2352 fn delay(&mut self, state: Box<WorkingState>) {
2353 self.unused = Some(state);
2354 }
2355
2356 fn reset(&mut self) {
2357 self.unused = None;
2358 self.future = None;
2359 }
2360}
2361
2362struct HandleGenesisResult {
2363 anchors_proc_info_opt: Option<AnchorsProcessingInfo>,
2364 genesis_info: GenesisInfo,
2365 genesis_updated: bool,
2367}
2368
2369struct AnchorsProcessingInfo {
2370 pub processed_to_anchor_id: MempoolAnchorId,
2375 pub processed_to_msgs_offset: u64,
2380 pub last_imported_chain_time: u64,
2383 pub last_imported_in_block_id: BlockId,
2385 pub current_shard_last_imported_chain_time: u64,
2391}
2392impl std::fmt::Debug for AnchorsProcessingInfo {
2393 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2394 std::fmt::Display::fmt(&self, f)
2395 }
2396}
2397impl std::fmt::Display for AnchorsProcessingInfo {
2398 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2399 f.debug_struct("AnchorsProcessingInfo")
2400 .field("processed_to_anchor_id", &self.processed_to_anchor_id)
2401 .field("processed_to_msgs_offset", &self.processed_to_msgs_offset)
2402 .field("last_imported_chain_time", &self.last_imported_chain_time)
2403 .field(
2404 "last_imported_in_block_id",
2405 &DebugDisplay(&self.last_imported_in_block_id),
2406 )
2407 .field(
2408 "current_shard_last_imported_chain_time",
2409 &self.current_shard_last_imported_chain_time,
2410 )
2411 .finish()
2412 }
2413}
2414
2415enum ImportNextAnchor {
2416 Result {
2417 prev_anchor_id: MempoolAnchorId,
2418 get_anchor_result: GetAnchorResult,
2419 has_our_externals: bool,
2420 requested_at: u64,
2421 },
2422 Skipped,
2423}
2424
2425#[derive(Debug)]
2426#[allow(dead_code)]
2427enum InitAnchorSource {
2428 FromCache(AnchorInfo),
2429 Imported(AnchorInfo),
2430}
2431
2432#[derive(Default)]
2433struct ImportInitAnchorsResult {
2434 anchors_info: Vec<InitAnchorSource>,
2435 anchors_count_above_last_imported_in_current_shard: usize,
2439}
2440
2441fn max_anchors_processing_lag_rounds<T: Into<u32>>(max_consensus_lag_rounds: T) -> u32 {
2445 max_consensus_lag_rounds
2446 .into()
2447 .saturating_mul(2)
2448 .saturating_div(3)
2449}
2450
2451#[derive(PartialEq, Eq)]
2452enum NextCollationFlowStep {
2453 Continue,
2454 Cancel,
2455}
2456
2457impl NextCollationFlowStep {
2458 pub fn should_cancel(&self) -> bool {
2459 *self == Self::Cancel
2460 }
2461}