Skip to main content

tycho_collator/
state_node.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use anyhow::{Context, Result, anyhow};
6use async_trait::async_trait;
7use parking_lot::Mutex;
8use tokio::sync::{broadcast, watch};
9use tycho_block_util::block::{BlockProofStuff, BlockStuff, BlockStuffAug};
10use tycho_block_util::queue::QueueDiffStuff;
11use tycho_block_util::state::ShardStateStuff;
12use tycho_core::global_config::ZerostateId;
13use tycho_core::storage::{
14    BlockHandle, BlockInfoForApply, CoreStorage, LoadStateHint, MaybeExistingHandle, NewBlockMeta,
15    StoreStateHint,
16};
17use tycho_network::PeerId;
18use tycho_types::boc::BocRepr;
19use tycho_types::merkle::{MerkleProof, MerkleUpdate};
20use tycho_types::models::*;
21use tycho_types::prelude::*;
22use tycho_util::mem::Reclaimer;
23use tycho_util::metrics::HistogramGuard;
24use tycho_util::sync::rayon_run;
25use tycho_util::{FastDashMap, FastHashMap};
26
27use crate::tracing_targets;
28use crate::types::processed_upto::BlockSeqno;
29use crate::types::{ArcSignature, BlockStuffForSync};
30
31// FACTORY
32
33pub trait StateNodeAdapterFactory {
34    type Adapter: StateNodeAdapter;
35
36    fn create(&self, listener: Arc<dyn StateNodeEventListener>) -> Self::Adapter;
37}
38
39impl<F, R> StateNodeAdapterFactory for F
40where
41    F: Fn(Arc<dyn StateNodeEventListener>) -> R,
42    R: StateNodeAdapter,
43{
44    type Adapter = R;
45
46    fn create(&self, listener: Arc<dyn StateNodeEventListener>) -> Self::Adapter {
47        self(listener)
48    }
49}
50
51#[async_trait]
52pub trait StateNodeEventListener: Send + Sync {
53    /// When our collated block was accepted and applied
54    async fn on_block_accepted(&self, state: &ShardStateStuff) -> Result<()>;
55    /// When new block was received and applied from blockchain
56    async fn on_block_accepted_external(
57        &self,
58        mc_block_id: &BlockId,
59        state: &ShardStateStuff,
60    ) -> Result<()>;
61}
62
63#[async_trait]
64pub trait StateNodeAdapter: Send + Sync + 'static {
65    /// Return id of last master block that was applied to node local state
66    fn load_last_applied_mc_block_id(&self) -> Result<BlockId>;
67    /// Return master or shard state on specified block from node local state
68    async fn load_state(
69        &self,
70        ref_by_mc_seqno: u32,
71        block_id: &BlockId,
72        hint: LoadStateHint,
73    ) -> Result<ShardStateStuff>;
74    /// Store shard state root in the storage.
75    /// Returns `true` when state was updated in storage.
76    async fn begin_store_next_state(
77        &self,
78        prev_block_id: &BlockId,
79        block_id: &BlockId,
80        meta: NewBlockMeta,
81        merkle_update: &MerkleUpdate,
82        state: ShardStateStuff,
83        hint: StoreStateHint,
84    ) -> Result<Box<dyn InitiatedStoreState>>;
85    /// Return block by its id from node local state
86    async fn load_block(&self, block_id: &BlockId) -> Result<Option<BlockStuff>>;
87    /// Return block by its handle from node local state
88    async fn load_block_by_handle(&self, handle: &BlockHandle) -> Result<Option<BlockStuff>>;
89    /// Return block handle by its id from node local state
90    async fn load_block_handle(&self, block_id: &BlockId) -> Result<Option<BlockHandle>>;
91    /// Return `ref_by_mc_seqno` from block handle by its id from node local state
92    async fn get_ref_by_mc_seqno(&self, block_id: &BlockId) -> Result<Option<BlockSeqno>>;
93    /// Accept block:
94    /// 1. (TODO) Broadcast block to blockchain network
95    /// 2. Provide block to the block strider
96    fn accept_block(&self, block: Arc<BlockStuffForSync>) -> Result<()>;
97    /// Fill the shard blocks cache is used for building new states by applying Merkle updates
98    fn fill_shard_blocks_cache(&self, ref_by_mc_seqno: u32, block: BlockStuff) -> Result<()>;
99    /// Waits for the specified block to be received and returns it
100    async fn wait_for_block(&self, block_id: &BlockId) -> Option<Result<BlockStuffAug>>;
101    /// Waits for the specified block by `prev_id` to be received and returns it
102    async fn wait_for_block_next(&self, prev_id: &BlockId) -> Option<Result<BlockStuffAug>>;
103    /// Handle state after block was applied
104    async fn handle_state(&self, mc_block_id: &BlockId, state: &ShardStateStuff) -> Result<()>;
105    /// Load queue diff
106    async fn load_diff(&self, block_id: &BlockId) -> Result<Option<QueueDiffStuff>>;
107    /// Handle sync context update
108    fn set_sync_context(&self, sync_context: CollatorSyncContext);
109    fn load_init_block_id(&self) -> Option<BlockId>;
110    fn zerostate_id(&self) -> &ZerostateId;
111    fn shard_split_depth(&self) -> u8;
112}
113
114#[async_trait]
115pub trait InitiatedStoreState: Send + 'static {
116    async fn wait_store_only(self: Box<Self>) -> Result<()>;
117    async fn wait_reload(self: Box<Self>) -> Result<ShardStateStuff>;
118}
119
120pub struct StateNodeAdapterStdImpl {
121    listener: Arc<dyn StateNodeEventListener>,
122    blocks: FastDashMap<ShardIdent, BTreeMap<u32, Arc<BlockStuffForSync>>>,
123    shard_blocks: Arc<ShardBlocksMap>,
124    storage: CoreStorage,
125    broadcaster: broadcast::Sender<BlockId>,
126
127    sync_context_tx: watch::Sender<CollatorSyncContext>,
128
129    delayed_state_notifier: DelayedStateNotifier,
130    zerostate_id: ZerostateId,
131}
132
133type ShardBlocksMap = FastDashMap<ShardIdent, BTreeMap<BlockId, ShardBlockData>>;
134
135impl StateNodeAdapterStdImpl {
136    pub fn new(
137        listener: Arc<dyn StateNodeEventListener>,
138        storage: CoreStorage,
139        initial_sync_context: CollatorSyncContext,
140        zerostate_id: ZerostateId,
141    ) -> Self {
142        let (sync_context_tx, mut sync_context_rx) = watch::channel(initial_sync_context);
143        let (broadcaster, _) = broadcast::channel(10000);
144
145        let adapter = Self {
146            listener,
147            storage,
148            blocks: Default::default(),
149            shard_blocks: Default::default(),
150            broadcaster,
151            sync_context_tx,
152            delayed_state_notifier: DelayedStateNotifier::default(),
153            zerostate_id,
154        };
155
156        tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "Start watching for sync context updates");
157
158        tokio::spawn({
159            let listener = adapter.listener.clone();
160            let delayed_state_notifier = adapter.delayed_state_notifier.clone();
161            async move {
162                while sync_context_rx.changed().await.is_ok() {
163                    let sync_ctx = *sync_context_rx.borrow();
164
165                    delayed_state_notifier
166                        .send_delayed_if(listener.clone(), |delayed_sync_ctx| {
167                            // send last delayed Persistent or Historical state when sync switched to recent blocks
168                            let check = sync_ctx == CollatorSyncContext::Recent
169                                && delayed_sync_ctx != CollatorSyncContext::Recent;
170                            if check {
171                                tracing::debug!(
172                                    target: tracing_targets::STATE_NODE_ADAPTER,
173                                    sync_ctx = ?sync_ctx,
174                                    delayed_sync_ctx = ?delayed_sync_ctx,
175                                    "handle_sync_context_update: will process delayed state",
176                                );
177                            } else {
178                                tracing::debug!(
179                                    target: tracing_targets::STATE_NODE_ADAPTER,
180                                    sync_ctx = ?sync_ctx,
181                                    delayed_sync_ctx = ?delayed_sync_ctx,
182                                    "handle_sync_context_update: will not process delayed state",
183                                );
184                            }
185                            check
186                        })
187                        .await
188                        .unwrap();
189                }
190            }
191        });
192
193        tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "State node adapter created");
194
195        adapter
196    }
197}
198
199#[async_trait]
200impl StateNodeAdapter for StateNodeAdapterStdImpl {
201    fn load_last_applied_mc_block_id(&self) -> Result<BlockId> {
202        let las_applied_mc_block_id = self
203            .storage
204            .node_state()
205            .load_last_mc_block_id()
206            .context("no blocks applied yet")?;
207
208        tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER,
209            "Loaded last applied mc block id {}",
210            las_applied_mc_block_id.as_short_id(),
211        );
212
213        Ok(las_applied_mc_block_id)
214    }
215
216    async fn load_state(
217        &self,
218        ref_by_mc_seqno: u32,
219        block_id: &BlockId,
220        hint: LoadStateHint,
221    ) -> Result<ShardStateStuff> {
222        let _histogram = HistogramGuard::begin("tycho_collator_state_load_state_time");
223
224        tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Load state: {}", block_id.as_short_id());
225
226        self.storage
227            .shard_state_storage()
228            .load_state_ext(ref_by_mc_seqno, block_id, hint, |block_id| {
229                self.shard_blocks
230                    .get(&block_id.shard)
231                    .and_then(|shard_blocks| {
232                        let block = shard_blocks.get(block_id)?;
233                        Some(BlockInfoForApply {
234                            prev_block_id: block.prev_id,
235                            partial_root_cell: block.state_update.new.clone(),
236                        })
237                    })
238            })
239            .await
240    }
241
242    async fn begin_store_next_state(
243        &self,
244        prev_block_id: &BlockId,
245        block_id: &BlockId,
246        meta: NewBlockMeta,
247        merkle_update: &MerkleUpdate,
248        state: ShardStateStuff,
249        hint: StoreStateHint,
250    ) -> Result<Box<dyn InitiatedStoreState>> {
251        struct HistogramOnDrop {
252            workchain: i32,
253            started_at: Instant,
254        }
255
256        impl Drop for HistogramOnDrop {
257            fn drop(&mut self) {
258                metrics::histogram!(
259                    "tycho_collator_state_store_state_root_time_high",
260                    "workchain" => self.workchain.to_string(),
261                )
262                .record(self.started_at.elapsed());
263            }
264        }
265
266        struct StoreOperation {
267            _metrics: HistogramOnDrop,
268            inner: tycho_core::storage::InitiatedStoreState,
269        }
270
271        #[async_trait]
272        impl InitiatedStoreState for StoreOperation {
273            async fn wait_store_only(self: Box<Self>) -> Result<()> {
274                self.inner.wait_store_only().await
275            }
276
277            async fn wait_reload(self: Box<Self>) -> Result<ShardStateStuff> {
278                self.inner.wait_reload().await
279            }
280        }
281
282        let metrics = HistogramOnDrop {
283            workchain: block_id.shard.workchain(),
284            started_at: Instant::now(),
285        };
286
287        tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Store state root: {}", block_id.as_short_id());
288
289        let prev_handle = self
290            .storage
291            .block_handle_storage()
292            .load_handle(prev_block_id)
293            .context("no prev block handle found")?;
294
295        let (next_handle, _) = self
296            .storage
297            .block_handle_storage()
298            .create_or_load_handle(block_id, meta);
299
300        let get_merkle_update = Box::new({
301            let shard_blocks = self.shard_blocks.clone();
302            move |block_id: &BlockId| {
303                shard_blocks.get(&block_id.shard).and_then(|shard_blocks| {
304                    let block = shard_blocks.get(block_id)?;
305                    Some(BlockInfoForApply {
306                        prev_block_id: block.prev_id,
307                        partial_root_cell: block.state_update.new.clone(),
308                    })
309                })
310            }
311        });
312
313        let inner = self.storage.shard_state_storage().begin_store_next_state(
314            &prev_handle,
315            &next_handle,
316            merkle_update,
317            Some(state),
318            hint,
319            Some(get_merkle_update),
320        )?;
321
322        Ok(Box::new(StoreOperation {
323            _metrics: metrics,
324            inner,
325        }))
326    }
327
328    async fn load_block(&self, block_id: &BlockId) -> Result<Option<BlockStuff>> {
329        let _histogram = HistogramGuard::begin("tycho_collator_state_load_block_time");
330
331        tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Load block: {}", block_id.as_short_id());
332
333        let handle_storage = self.storage.block_handle_storage();
334
335        match handle_storage.load_handle(block_id) {
336            Some(handle) => self.load_block_by_handle(&handle).await,
337            _ => Ok(None),
338        }
339    }
340
341    async fn load_block_by_handle(&self, handle: &BlockHandle) -> Result<Option<BlockStuff>> {
342        if !handle.has_data() {
343            return Ok(None);
344        }
345
346        tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Load block by handle: {}", handle.id().as_short_id());
347
348        let block_storage = self.storage.block_storage();
349        block_storage.load_block_data(handle).await.map(Some)
350    }
351
352    async fn load_block_handle(&self, block_id: &BlockId) -> Result<Option<BlockHandle>> {
353        tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Load block handle: {}", block_id.as_short_id());
354        Ok(self.storage.block_handle_storage().load_handle(block_id))
355    }
356
357    async fn get_ref_by_mc_seqno(&self, block_id: &BlockId) -> Result<Option<BlockSeqno>> {
358        Ok(self
359            .load_block_handle(block_id)
360            .await?
361            .map(|block_handle| block_handle.ref_by_mc_seqno()))
362    }
363
364    fn accept_block(&self, block: Arc<BlockStuffForSync>) -> Result<()> {
365        let block_id = *block.block_stuff_aug.id();
366
367        tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Block accepted: {}", block_id.as_short_id());
368
369        self.blocks
370            .entry(block_id.shard)
371            .or_default()
372            .insert(block_id.seqno, block);
373
374        let broadcast_result = self.broadcaster.send(block_id).ok();
375        tracing::trace!(target: tracing_targets::STATE_NODE_ADAPTER, "Block broadcast_result: {:?}", broadcast_result);
376        Ok(())
377    }
378
379    fn fill_shard_blocks_cache(&self, ref_by_mc_seqno: u32, block: BlockStuff) -> Result<()> {
380        let block_id = *block.id();
381
382        if !block_id.is_masterchain() {
383            tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Shard block accepted: {}", block_id.as_short_id());
384
385            let state_update = block.block().load_state_update()?;
386            let (prev_id, prev_id_alt) = block.construct_prev_id()?;
387
388            self.shard_blocks
389                .entry(block_id.shard)
390                .or_default()
391                .insert(block_id, ShardBlockData {
392                    prev_id,
393                    ref_by_mc_seqno,
394                    state_update,
395                    _prev_id_alt: prev_id_alt,
396                });
397        }
398
399        Ok(())
400    }
401
402    async fn wait_for_block(&self, block_id: &BlockId) -> Option<Result<BlockStuffAug>> {
403        let block_id = BlockIdToWait::Full(block_id);
404        self.wait_for_block_ext(block_id).await
405    }
406
407    async fn wait_for_block_next(&self, prev_block_id: &BlockId) -> Option<Result<BlockStuffAug>> {
408        let next_block_id_short =
409            BlockIdShort::from((prev_block_id.shard, prev_block_id.seqno + 1));
410        let block_id = BlockIdToWait::Short(&next_block_id_short);
411        self.wait_for_block_ext(block_id).await
412    }
413
414    async fn handle_state(&self, mc_block_id: &BlockId, state: &ShardStateStuff) -> Result<()> {
415        let _histogram = HistogramGuard::begin("tycho_collator_state_adapter_handle_state_time");
416
417        let sync_context = *self.sync_context_tx.borrow();
418
419        tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "handle_state: block {}", state.block_id());
420        let block_id = *state.block_id();
421        debug_assert!(!block_id.is_masterchain() || &block_id == mc_block_id);
422
423        let mut to_split = Vec::new();
424
425        let shard = block_id.shard;
426        let seqno = block_id.seqno;
427
428        {
429            let has_block = if let Some(shard_blocks) = self.blocks.get(&shard) {
430                let has_block = shard_blocks.contains_key(&seqno);
431
432                if shard.is_masterchain() {
433                    let prev_mc_block = shard_blocks
434                        .range(..=seqno)
435                        .rev()
436                        .find_map(|(&key, value)| if key < seqno { Some(value) } else { None });
437
438                    if let Some(prev_mc_block) = prev_mc_block {
439                        for id in &prev_mc_block.top_shard_blocks_ids {
440                            to_split.push((id.shard, id.seqno + 1));
441                        }
442                        to_split.push((shard, prev_mc_block.block_stuff_aug.id().seqno + 1));
443                    }
444                }
445
446                has_block
447            } else {
448                false
449            };
450
451            self.delayed_state_notifier
452                .send_or_delay(
453                    self.listener.clone(),
454                    mc_block_id,
455                    state.clone(),
456                    !has_block,
457                    sync_context,
458                    |sync_ctx| {
459                        let check = sync_ctx == CollatorSyncContext::Recent;
460                        if !check {
461                            tracing::debug!(
462                                target: tracing_targets::STATE_NODE_ADAPTER,
463                                block_id = %state.block_id().as_short_id(),
464                                sync_ctx = ?sync_context,
465                                "handle_state: will delay state",
466                            );
467                        }
468                        check
469                    },
470                )
471                .await?;
472        }
473
474        let mut to_drop = Vec::new();
475        for (shard, seqno) in &to_split {
476            if let Some(mut blocks) = self.blocks.get_mut(shard) {
477                let retained = blocks.split_off(seqno);
478                to_drop.push(std::mem::replace(&mut *blocks, retained));
479            }
480        }
481
482        // Don't wait for drop inside a tokio context.
483        Reclaimer::instance().drop(to_drop);
484
485        if shard.is_masterchain() {
486            let handle_storage = self.storage.block_handle_storage();
487
488            let mut to_drop = Vec::new();
489            for entry in state.shards()?.latest_blocks() {
490                let top_block_id = entry?;
491                let Some(mut sb) = self.shard_blocks.get_mut(&top_block_id.shard) else {
492                    continue;
493                };
494
495                // Find the oldest block that is not yet persisted to storage.
496                // Keep it and everything after to maintain the merkle update chain.
497                let retain_from = sb
498                    .range(..top_block_id)
499                    .find(|(block_id, _)| {
500                        !matches!(
501                            handle_storage.load_handle(block_id),
502                            Some(handle) if handle.has_data()
503                        )
504                    })
505                    .map_or(top_block_id, |(block_id, _)| *block_id);
506
507                let retained = sb.split_off(&retain_from);
508                to_drop.push(std::mem::replace(&mut *sb, retained));
509            }
510
511            // Don't wait for drop inside a tokio context.
512            Reclaimer::instance().drop(to_drop);
513        }
514
515        Ok(())
516    }
517
518    async fn load_diff(&self, block_id: &BlockId) -> Result<Option<QueueDiffStuff>> {
519        let _histogram = HistogramGuard::begin("tycho_collator_state_load_queue_diff_time");
520
521        tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Load queue diff: {}", block_id.as_short_id());
522
523        let handle_storage = self.storage.block_handle_storage();
524        let block_storage = self.storage.block_storage();
525
526        match handle_storage.load_handle(block_id) {
527            Some(handle) if handle.has_queue_diff() => {
528                block_storage.load_queue_diff(&handle).await.map(Some)
529            }
530            _ => Ok(None),
531        }
532    }
533
534    fn set_sync_context(&self, sync_context: CollatorSyncContext) {
535        self.sync_context_tx.send_if_modified(|curr| {
536            if *curr != sync_context {
537                *curr = sync_context;
538                true
539            } else {
540                false
541            }
542        });
543    }
544
545    fn load_init_block_id(&self) -> Option<BlockId> {
546        self.storage.node_state().load_init_mc_block_id()
547    }
548
549    fn zerostate_id(&self) -> &ZerostateId {
550        &self.zerostate_id
551    }
552
553    fn shard_split_depth(&self) -> u8 {
554        self.storage.config().shard_split_depth
555    }
556}
557
558impl StateNodeAdapterStdImpl {
559    async fn wait_for_block_ext(
560        &self,
561        block_id: BlockIdToWait<'_>,
562    ) -> Option<Result<BlockStuffAug>> {
563        let mut receiver = self.broadcaster.subscribe();
564        loop {
565            if let Some(shard_blocks) = self.blocks.get(&block_id.shard()) {
566                let block = shard_blocks.get(&block_id.seqno()).cloned();
567                drop(shard_blocks);
568
569                if let Some(block) = block {
570                    return match self.save_block_proof(&block).await {
571                        Ok(_) => Some(Ok(block.block_stuff_aug.clone())),
572                        Err(e) => Some(Err(anyhow!("failed to save block proof: {e:?}"))),
573                    };
574                }
575            }
576
577            loop {
578                match receiver.recv().await {
579                    Ok(received_block_id) if block_id == received_block_id => {
580                        break;
581                    }
582                    Ok(_) => {}
583                    Err(broadcast::error::RecvError::Lagged(count)) => {
584                        tracing::warn!(target: tracing_targets::STATE_NODE_ADAPTER, "Broadcast channel lagged: {}", count);
585                    }
586                    Err(broadcast::error::RecvError::Closed) => {
587                        tracing::error!(target: tracing_targets::STATE_NODE_ADAPTER, "Broadcast channel closed");
588                        return None;
589                    }
590                }
591            }
592        }
593    }
594
595    async fn save_block_proof(&self, block: &Arc<BlockStuffForSync>) -> Result<()> {
596        let (block_info, archive_data) = rayon_run({
597            let block = block.clone();
598            move || {
599                let PreparedProof { proof, block_info } = prepare_block_proof(
600                    &block.block_stuff_aug.data,
601                    &block.consensus_info,
602                    &block.signatures,
603                    block.total_signature_weight,
604                )
605                .unwrap_or_else(|e| {
606                    panic!(
607                        "failed to prepare block proof for {:?}: {e:?}",
608                        block.block_stuff_aug.id()
609                    )
610                });
611
612                let block_proof_stuff = BlockProofStuff::from_proof(proof);
613
614                let proof_boc = BocRepr::encode_rayon(block_proof_stuff.as_ref())
615                    .expect("valid block proof must be successfully serialized");
616                let archive_data = block_proof_stuff.with_archive_data(proof_boc);
617
618                (block_info, archive_data)
619            }
620        })
621        .await;
622
623        let _histogram =
624            HistogramGuard::begin("tycho_collator_state_adapter_save_block_proof_time_high");
625
626        let block_storage = self.storage.block_storage();
627        let result = block_storage
628            .store_block_proof(
629                &archive_data,
630                MaybeExistingHandle::New(NewBlockMeta {
631                    is_key_block: block_info.key_block,
632                    gen_utime: block_info.gen_utime,
633                    ref_by_mc_seqno: block.ref_by_mc_seqno,
634                }),
635            )
636            .await?;
637        let is_new_proof = result.new;
638        let is_proof_updated = result.updated;
639
640        let result = block_storage
641            .store_queue_diff(&block.queue_diff_aug, result.handle.into())
642            .await?;
643        let is_new_diff = result.new;
644        let is_diff_updated = result.updated;
645
646        tracing::info!(
647            block_id = %result.handle.id(),
648            is_new_proof,
649            is_proof_updated,
650            is_new_diff,
651            is_diff_updated,
652            "block saved",
653        );
654
655        Ok(())
656    }
657}
658
659#[derive(Clone)]
660struct ShardBlockData {
661    prev_id: BlockId,
662    #[expect(unused)] // might be needed later
663    ref_by_mc_seqno: u32,
664    state_update: MerkleUpdate,
665    _prev_id_alt: Option<BlockId>, // TODO: consider split/merge
666}
667
668#[derive(Clone)]
669struct DelayedStateContext {
670    pub mc_block_id: BlockId,
671    pub state: ShardStateStuff,
672    pub is_external: bool,
673    pub sync_context: CollatorSyncContext,
674}
675
676#[derive(Default, Clone)]
677struct DelayedStateNotifier {
678    inner: Arc<Mutex<Option<DelayedStateContext>>>,
679}
680impl DelayedStateNotifier {
681    pub async fn send_delayed_if<F>(
682        &self,
683        listener: Arc<dyn StateNodeEventListener>,
684        check_should_send: F,
685    ) -> Result<()>
686    where
687        F: Fn(CollatorSyncContext) -> bool,
688    {
689        let state_cx = {
690            let mut guard = self.inner.lock();
691            match guard.as_ref() {
692                Some(state_cx) if check_should_send(state_cx.sync_context) => guard.take(),
693                _ => None,
694            }
695        };
696
697        // do nothing if no delayed state
698        // do nothing if should not send according to check
699
700        Self::send_impl(listener, state_cx).await
701    }
702
703    pub async fn send_or_delay<F>(
704        &self,
705        listener: Arc<dyn StateNodeEventListener>,
706        mc_block_id: &BlockId,
707        state: ShardStateStuff,
708        is_external: bool,
709        sync_context: CollatorSyncContext,
710        check_should_send: F,
711    ) -> Result<()>
712    where
713        F: Fn(CollatorSyncContext) -> bool,
714    {
715        let state_cx = DelayedStateContext {
716            mc_block_id: *mc_block_id,
717            state,
718            is_external,
719            sync_context,
720        };
721
722        let state_cx = {
723            let mut guard = self.inner.lock();
724            if check_should_send(state_cx.sync_context) {
725                guard.take();
726                Some(state_cx)
727            } else {
728                guard.replace(state_cx);
729                None
730            }
731        };
732
733        // send if needed according to check
734
735        Self::send_impl(listener, state_cx).await
736    }
737
738    async fn send_impl(
739        listener: Arc<dyn StateNodeEventListener>,
740        state_cx: Option<DelayedStateContext>,
741    ) -> Result<()> {
742        let Some(DelayedStateContext {
743            mc_block_id,
744            state,
745            is_external,
746            ..
747        }) = state_cx
748        else {
749            return Ok(());
750        };
751
752        if is_external {
753            tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "handle_state: handled external: {}", state.block_id());
754            listener
755                .on_block_accepted_external(&mc_block_id, &state)
756                .await
757        } else {
758            tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "handle_state: handled own: {}", state.block_id());
759            listener.on_block_accepted(&state).await
760        }
761    }
762}
763
764#[expect(
765    clippy::disallowed_methods,
766    reason = "We are working with a virtual block here, so `load_extra` and other methods are necessary"
767)]
768fn prepare_block_proof(
769    block_stuff: &BlockStuff,
770    consensus_info: &ConsensusInfo,
771    signatures: &FastHashMap<PeerId, ArcSignature>,
772    total_signature_weight: u64,
773) -> Result<PreparedProof> {
774    let _histogram = HistogramGuard::begin("tycho_collator_state_adapter_prepare_block_proof_time");
775
776    let usage_tree = UsageTree::new(UsageTreeMode::OnLoad);
777    let tracked_cell = usage_tree.track(block_stuff.root_cell());
778    let block = tracked_cell.parse::<Block>()?;
779    block.value_flow.inner().as_ref().touch_recursive();
780
781    let block_info = block.load_info()?;
782
783    block_info.load_prev_ref()?;
784    block_info.prev_vert_ref.as_ref().map(|x| x.load());
785    block_info.master_ref.as_ref().map(|x| x.load());
786
787    // NOTE: Make sure to "visit" the `out_msg_queue_updates` if we add some
788    //       child cells to it. For now it is loaded inside `.parse::<Block>()`.
789
790    let _state_update = block.load_state_update();
791
792    if let Some(custom) = block.load_extra()?.load_custom()? {
793        // Include full shard description info.
794        if let Some(root) = custom.shards.as_dict().root() {
795            root.touch_recursive();
796        }
797
798        if let Some(config) = &custom.config {
799            // Include collation configuration params.
800            config.get::<ConfigParam28>()?;
801            // Include all validator sets.
802            for param_id in 32..=38 {
803                if let Some(mut vset) = config.get_raw(param_id)? {
804                    ValidatorSet::load_from(&mut vset)?;
805                }
806            }
807        }
808    }
809
810    let merkle_proof = MerkleProof::create(block_stuff.root_cell().as_ref(), usage_tree).build()?;
811
812    let root = CellBuilder::build_from(merkle_proof)?;
813
814    let signatures = if block_stuff.id().is_masterchain() {
815        Some(process_signatures(
816            block_info.gen_validator_list_hash_short,
817            block_info.gen_catchain_seqno,
818            total_signature_weight,
819            consensus_info,
820            signatures,
821        )?)
822    } else {
823        None
824    };
825
826    Ok(PreparedProof {
827        proof: Box::new(BlockProof {
828            proof_for: *block_stuff.id(),
829            root,
830            signatures,
831        }),
832        block_info,
833    })
834}
835
836fn process_signatures(
837    gen_validator_list_hash_short: u32,
838    gen_session_seqno: u32,
839    total_weight: u64,
840    consensus_info: &ConsensusInfo,
841    block_signatures: &FastHashMap<PeerId, ArcSignature>,
842) -> Result<tycho_types::models::block::BlockSignatures> {
843    use tycho_types::dict;
844
845    // TODO: Add helper for owned iter
846    let signatures = Dict::from_raw(dict::build_dict_from_sorted_iter(
847        block_signatures
848            .iter()
849            .enumerate()
850            .map(|(i, (key, value))| {
851                let key_hash = tl_proto::hash(tycho_crypto::tl::PublicKey::Ed25519 {
852                    key: key.as_bytes(),
853                });
854
855                (i as u16, BlockSignature {
856                    node_id_short: key_hash.into(),
857                    signature: Signature(*value.as_ref()),
858                })
859            }),
860        Cell::empty_context(),
861    )?);
862
863    Ok(tycho_types::models::block::BlockSignatures {
864        validator_info: ValidatorBaseInfo {
865            validator_list_hash_short: gen_validator_list_hash_short,
866            catchain_seqno: gen_session_seqno,
867        },
868        consensus_info: *consensus_info,
869        signature_count: block_signatures.len() as u32,
870        total_weight,
871        signatures,
872    })
873}
874
875#[derive(Debug, Copy, Clone, PartialEq, Eq)]
876pub enum CollatorSyncContext {
877    Persistent,
878    Historical,
879    Recent,
880}
881
882struct PreparedProof {
883    proof: Box<BlockProof>,
884    block_info: BlockInfo,
885}
886
887enum BlockIdToWait<'a> {
888    Short(&'a BlockIdShort),
889    Full(&'a BlockId),
890}
891
892impl BlockIdToWait<'_> {
893    fn shard(&self) -> ShardIdent {
894        match self {
895            Self::Short(id) => id.shard,
896            Self::Full(id) => id.shard,
897        }
898    }
899
900    fn seqno(&self) -> u32 {
901        match self {
902            Self::Short(id) => id.seqno,
903            Self::Full(id) => id.seqno,
904        }
905    }
906}
907
908impl PartialEq<BlockId> for BlockIdToWait<'_> {
909    fn eq(&self, other: &BlockId) -> bool {
910        match *self {
911            BlockIdToWait::Short(short) => &other.as_short_id() == short,
912            BlockIdToWait::Full(full) => full == other,
913        }
914    }
915}