1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Instant;
4
5use anyhow::{Context, Result, anyhow};
6use async_trait::async_trait;
7use parking_lot::Mutex;
8use tokio::sync::{broadcast, watch};
9use tycho_block_util::block::{BlockProofStuff, BlockStuff, BlockStuffAug};
10use tycho_block_util::queue::QueueDiffStuff;
11use tycho_block_util::state::ShardStateStuff;
12use tycho_core::global_config::ZerostateId;
13use tycho_core::storage::{
14 BlockHandle, BlockInfoForApply, CoreStorage, LoadStateHint, MaybeExistingHandle, NewBlockMeta,
15 StoreStateHint,
16};
17use tycho_network::PeerId;
18use tycho_types::boc::BocRepr;
19use tycho_types::merkle::{MerkleProof, MerkleUpdate};
20use tycho_types::models::*;
21use tycho_types::prelude::*;
22use tycho_util::mem::Reclaimer;
23use tycho_util::metrics::HistogramGuard;
24use tycho_util::sync::rayon_run;
25use tycho_util::{FastDashMap, FastHashMap};
26
27use crate::tracing_targets;
28use crate::types::processed_upto::BlockSeqno;
29use crate::types::{ArcSignature, BlockStuffForSync};
30
31pub trait StateNodeAdapterFactory {
34 type Adapter: StateNodeAdapter;
35
36 fn create(&self, listener: Arc<dyn StateNodeEventListener>) -> Self::Adapter;
37}
38
39impl<F, R> StateNodeAdapterFactory for F
40where
41 F: Fn(Arc<dyn StateNodeEventListener>) -> R,
42 R: StateNodeAdapter,
43{
44 type Adapter = R;
45
46 fn create(&self, listener: Arc<dyn StateNodeEventListener>) -> Self::Adapter {
47 self(listener)
48 }
49}
50
51#[async_trait]
52pub trait StateNodeEventListener: Send + Sync {
53 async fn on_block_accepted(&self, state: &ShardStateStuff) -> Result<()>;
55 async fn on_block_accepted_external(
57 &self,
58 mc_block_id: &BlockId,
59 state: &ShardStateStuff,
60 ) -> Result<()>;
61}
62
63#[async_trait]
64pub trait StateNodeAdapter: Send + Sync + 'static {
65 fn load_last_applied_mc_block_id(&self) -> Result<BlockId>;
67 async fn load_state(
69 &self,
70 ref_by_mc_seqno: u32,
71 block_id: &BlockId,
72 hint: LoadStateHint,
73 ) -> Result<ShardStateStuff>;
74 async fn begin_store_next_state(
77 &self,
78 prev_block_id: &BlockId,
79 block_id: &BlockId,
80 meta: NewBlockMeta,
81 merkle_update: &MerkleUpdate,
82 state: ShardStateStuff,
83 hint: StoreStateHint,
84 ) -> Result<Box<dyn InitiatedStoreState>>;
85 async fn load_block(&self, block_id: &BlockId) -> Result<Option<BlockStuff>>;
87 async fn load_block_by_handle(&self, handle: &BlockHandle) -> Result<Option<BlockStuff>>;
89 async fn load_block_handle(&self, block_id: &BlockId) -> Result<Option<BlockHandle>>;
91 async fn get_ref_by_mc_seqno(&self, block_id: &BlockId) -> Result<Option<BlockSeqno>>;
93 fn accept_block(&self, block: Arc<BlockStuffForSync>) -> Result<()>;
97 fn fill_shard_blocks_cache(&self, ref_by_mc_seqno: u32, block: BlockStuff) -> Result<()>;
99 async fn wait_for_block(&self, block_id: &BlockId) -> Option<Result<BlockStuffAug>>;
101 async fn wait_for_block_next(&self, prev_id: &BlockId) -> Option<Result<BlockStuffAug>>;
103 async fn handle_state(&self, mc_block_id: &BlockId, state: &ShardStateStuff) -> Result<()>;
105 async fn load_diff(&self, block_id: &BlockId) -> Result<Option<QueueDiffStuff>>;
107 fn set_sync_context(&self, sync_context: CollatorSyncContext);
109 fn load_init_block_id(&self) -> Option<BlockId>;
110 fn zerostate_id(&self) -> &ZerostateId;
111 fn shard_split_depth(&self) -> u8;
112}
113
114#[async_trait]
115pub trait InitiatedStoreState: Send + 'static {
116 async fn wait_store_only(self: Box<Self>) -> Result<()>;
117 async fn wait_reload(self: Box<Self>) -> Result<ShardStateStuff>;
118}
119
120pub struct StateNodeAdapterStdImpl {
121 listener: Arc<dyn StateNodeEventListener>,
122 blocks: FastDashMap<ShardIdent, BTreeMap<u32, Arc<BlockStuffForSync>>>,
123 shard_blocks: Arc<ShardBlocksMap>,
124 storage: CoreStorage,
125 broadcaster: broadcast::Sender<BlockId>,
126
127 sync_context_tx: watch::Sender<CollatorSyncContext>,
128
129 delayed_state_notifier: DelayedStateNotifier,
130 zerostate_id: ZerostateId,
131}
132
133type ShardBlocksMap = FastDashMap<ShardIdent, BTreeMap<BlockId, ShardBlockData>>;
134
135impl StateNodeAdapterStdImpl {
136 pub fn new(
137 listener: Arc<dyn StateNodeEventListener>,
138 storage: CoreStorage,
139 initial_sync_context: CollatorSyncContext,
140 zerostate_id: ZerostateId,
141 ) -> Self {
142 let (sync_context_tx, mut sync_context_rx) = watch::channel(initial_sync_context);
143 let (broadcaster, _) = broadcast::channel(10000);
144
145 let adapter = Self {
146 listener,
147 storage,
148 blocks: Default::default(),
149 shard_blocks: Default::default(),
150 broadcaster,
151 sync_context_tx,
152 delayed_state_notifier: DelayedStateNotifier::default(),
153 zerostate_id,
154 };
155
156 tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "Start watching for sync context updates");
157
158 tokio::spawn({
159 let listener = adapter.listener.clone();
160 let delayed_state_notifier = adapter.delayed_state_notifier.clone();
161 async move {
162 while sync_context_rx.changed().await.is_ok() {
163 let sync_ctx = *sync_context_rx.borrow();
164
165 delayed_state_notifier
166 .send_delayed_if(listener.clone(), |delayed_sync_ctx| {
167 let check = sync_ctx == CollatorSyncContext::Recent
169 && delayed_sync_ctx != CollatorSyncContext::Recent;
170 if check {
171 tracing::debug!(
172 target: tracing_targets::STATE_NODE_ADAPTER,
173 sync_ctx = ?sync_ctx,
174 delayed_sync_ctx = ?delayed_sync_ctx,
175 "handle_sync_context_update: will process delayed state",
176 );
177 } else {
178 tracing::debug!(
179 target: tracing_targets::STATE_NODE_ADAPTER,
180 sync_ctx = ?sync_ctx,
181 delayed_sync_ctx = ?delayed_sync_ctx,
182 "handle_sync_context_update: will not process delayed state",
183 );
184 }
185 check
186 })
187 .await
188 .unwrap();
189 }
190 }
191 });
192
193 tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "State node adapter created");
194
195 adapter
196 }
197}
198
199#[async_trait]
200impl StateNodeAdapter for StateNodeAdapterStdImpl {
201 fn load_last_applied_mc_block_id(&self) -> Result<BlockId> {
202 let las_applied_mc_block_id = self
203 .storage
204 .node_state()
205 .load_last_mc_block_id()
206 .context("no blocks applied yet")?;
207
208 tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER,
209 "Loaded last applied mc block id {}",
210 las_applied_mc_block_id.as_short_id(),
211 );
212
213 Ok(las_applied_mc_block_id)
214 }
215
216 async fn load_state(
217 &self,
218 ref_by_mc_seqno: u32,
219 block_id: &BlockId,
220 hint: LoadStateHint,
221 ) -> Result<ShardStateStuff> {
222 let _histogram = HistogramGuard::begin("tycho_collator_state_load_state_time");
223
224 tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Load state: {}", block_id.as_short_id());
225
226 self.storage
227 .shard_state_storage()
228 .load_state_ext(ref_by_mc_seqno, block_id, hint, |block_id| {
229 self.shard_blocks
230 .get(&block_id.shard)
231 .and_then(|shard_blocks| {
232 let block = shard_blocks.get(block_id)?;
233 Some(BlockInfoForApply {
234 prev_block_id: block.prev_id,
235 partial_root_cell: block.state_update.new.clone(),
236 })
237 })
238 })
239 .await
240 }
241
242 async fn begin_store_next_state(
243 &self,
244 prev_block_id: &BlockId,
245 block_id: &BlockId,
246 meta: NewBlockMeta,
247 merkle_update: &MerkleUpdate,
248 state: ShardStateStuff,
249 hint: StoreStateHint,
250 ) -> Result<Box<dyn InitiatedStoreState>> {
251 struct HistogramOnDrop {
252 workchain: i32,
253 started_at: Instant,
254 }
255
256 impl Drop for HistogramOnDrop {
257 fn drop(&mut self) {
258 metrics::histogram!(
259 "tycho_collator_state_store_state_root_time_high",
260 "workchain" => self.workchain.to_string(),
261 )
262 .record(self.started_at.elapsed());
263 }
264 }
265
266 struct StoreOperation {
267 _metrics: HistogramOnDrop,
268 inner: tycho_core::storage::InitiatedStoreState,
269 }
270
271 #[async_trait]
272 impl InitiatedStoreState for StoreOperation {
273 async fn wait_store_only(self: Box<Self>) -> Result<()> {
274 self.inner.wait_store_only().await
275 }
276
277 async fn wait_reload(self: Box<Self>) -> Result<ShardStateStuff> {
278 self.inner.wait_reload().await
279 }
280 }
281
282 let metrics = HistogramOnDrop {
283 workchain: block_id.shard.workchain(),
284 started_at: Instant::now(),
285 };
286
287 tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Store state root: {}", block_id.as_short_id());
288
289 let prev_handle = self
290 .storage
291 .block_handle_storage()
292 .load_handle(prev_block_id)
293 .context("no prev block handle found")?;
294
295 let (next_handle, _) = self
296 .storage
297 .block_handle_storage()
298 .create_or_load_handle(block_id, meta);
299
300 let get_merkle_update = Box::new({
301 let shard_blocks = self.shard_blocks.clone();
302 move |block_id: &BlockId| {
303 shard_blocks.get(&block_id.shard).and_then(|shard_blocks| {
304 let block = shard_blocks.get(block_id)?;
305 Some(BlockInfoForApply {
306 prev_block_id: block.prev_id,
307 partial_root_cell: block.state_update.new.clone(),
308 })
309 })
310 }
311 });
312
313 let inner = self.storage.shard_state_storage().begin_store_next_state(
314 &prev_handle,
315 &next_handle,
316 merkle_update,
317 Some(state),
318 hint,
319 Some(get_merkle_update),
320 )?;
321
322 Ok(Box::new(StoreOperation {
323 _metrics: metrics,
324 inner,
325 }))
326 }
327
328 async fn load_block(&self, block_id: &BlockId) -> Result<Option<BlockStuff>> {
329 let _histogram = HistogramGuard::begin("tycho_collator_state_load_block_time");
330
331 tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Load block: {}", block_id.as_short_id());
332
333 let handle_storage = self.storage.block_handle_storage();
334
335 match handle_storage.load_handle(block_id) {
336 Some(handle) => self.load_block_by_handle(&handle).await,
337 _ => Ok(None),
338 }
339 }
340
341 async fn load_block_by_handle(&self, handle: &BlockHandle) -> Result<Option<BlockStuff>> {
342 if !handle.has_data() {
343 return Ok(None);
344 }
345
346 tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Load block by handle: {}", handle.id().as_short_id());
347
348 let block_storage = self.storage.block_storage();
349 block_storage.load_block_data(handle).await.map(Some)
350 }
351
352 async fn load_block_handle(&self, block_id: &BlockId) -> Result<Option<BlockHandle>> {
353 tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Load block handle: {}", block_id.as_short_id());
354 Ok(self.storage.block_handle_storage().load_handle(block_id))
355 }
356
357 async fn get_ref_by_mc_seqno(&self, block_id: &BlockId) -> Result<Option<BlockSeqno>> {
358 Ok(self
359 .load_block_handle(block_id)
360 .await?
361 .map(|block_handle| block_handle.ref_by_mc_seqno()))
362 }
363
364 fn accept_block(&self, block: Arc<BlockStuffForSync>) -> Result<()> {
365 let block_id = *block.block_stuff_aug.id();
366
367 tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Block accepted: {}", block_id.as_short_id());
368
369 self.blocks
370 .entry(block_id.shard)
371 .or_default()
372 .insert(block_id.seqno, block);
373
374 let broadcast_result = self.broadcaster.send(block_id).ok();
375 tracing::trace!(target: tracing_targets::STATE_NODE_ADAPTER, "Block broadcast_result: {:?}", broadcast_result);
376 Ok(())
377 }
378
379 fn fill_shard_blocks_cache(&self, ref_by_mc_seqno: u32, block: BlockStuff) -> Result<()> {
380 let block_id = *block.id();
381
382 if !block_id.is_masterchain() {
383 tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Shard block accepted: {}", block_id.as_short_id());
384
385 let state_update = block.block().load_state_update()?;
386 let (prev_id, prev_id_alt) = block.construct_prev_id()?;
387
388 self.shard_blocks
389 .entry(block_id.shard)
390 .or_default()
391 .insert(block_id, ShardBlockData {
392 prev_id,
393 ref_by_mc_seqno,
394 state_update,
395 _prev_id_alt: prev_id_alt,
396 });
397 }
398
399 Ok(())
400 }
401
402 async fn wait_for_block(&self, block_id: &BlockId) -> Option<Result<BlockStuffAug>> {
403 let block_id = BlockIdToWait::Full(block_id);
404 self.wait_for_block_ext(block_id).await
405 }
406
407 async fn wait_for_block_next(&self, prev_block_id: &BlockId) -> Option<Result<BlockStuffAug>> {
408 let next_block_id_short =
409 BlockIdShort::from((prev_block_id.shard, prev_block_id.seqno + 1));
410 let block_id = BlockIdToWait::Short(&next_block_id_short);
411 self.wait_for_block_ext(block_id).await
412 }
413
414 async fn handle_state(&self, mc_block_id: &BlockId, state: &ShardStateStuff) -> Result<()> {
415 let _histogram = HistogramGuard::begin("tycho_collator_state_adapter_handle_state_time");
416
417 let sync_context = *self.sync_context_tx.borrow();
418
419 tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "handle_state: block {}", state.block_id());
420 let block_id = *state.block_id();
421 debug_assert!(!block_id.is_masterchain() || &block_id == mc_block_id);
422
423 let mut to_split = Vec::new();
424
425 let shard = block_id.shard;
426 let seqno = block_id.seqno;
427
428 {
429 let has_block = if let Some(shard_blocks) = self.blocks.get(&shard) {
430 let has_block = shard_blocks.contains_key(&seqno);
431
432 if shard.is_masterchain() {
433 let prev_mc_block = shard_blocks
434 .range(..=seqno)
435 .rev()
436 .find_map(|(&key, value)| if key < seqno { Some(value) } else { None });
437
438 if let Some(prev_mc_block) = prev_mc_block {
439 for id in &prev_mc_block.top_shard_blocks_ids {
440 to_split.push((id.shard, id.seqno + 1));
441 }
442 to_split.push((shard, prev_mc_block.block_stuff_aug.id().seqno + 1));
443 }
444 }
445
446 has_block
447 } else {
448 false
449 };
450
451 self.delayed_state_notifier
452 .send_or_delay(
453 self.listener.clone(),
454 mc_block_id,
455 state.clone(),
456 !has_block,
457 sync_context,
458 |sync_ctx| {
459 let check = sync_ctx == CollatorSyncContext::Recent;
460 if !check {
461 tracing::debug!(
462 target: tracing_targets::STATE_NODE_ADAPTER,
463 block_id = %state.block_id().as_short_id(),
464 sync_ctx = ?sync_context,
465 "handle_state: will delay state",
466 );
467 }
468 check
469 },
470 )
471 .await?;
472 }
473
474 let mut to_drop = Vec::new();
475 for (shard, seqno) in &to_split {
476 if let Some(mut blocks) = self.blocks.get_mut(shard) {
477 let retained = blocks.split_off(seqno);
478 to_drop.push(std::mem::replace(&mut *blocks, retained));
479 }
480 }
481
482 Reclaimer::instance().drop(to_drop);
484
485 if shard.is_masterchain() {
486 let handle_storage = self.storage.block_handle_storage();
487
488 let mut to_drop = Vec::new();
489 for entry in state.shards()?.latest_blocks() {
490 let top_block_id = entry?;
491 let Some(mut sb) = self.shard_blocks.get_mut(&top_block_id.shard) else {
492 continue;
493 };
494
495 let retain_from = sb
498 .range(..top_block_id)
499 .find(|(block_id, _)| {
500 !matches!(
501 handle_storage.load_handle(block_id),
502 Some(handle) if handle.has_data()
503 )
504 })
505 .map_or(top_block_id, |(block_id, _)| *block_id);
506
507 let retained = sb.split_off(&retain_from);
508 to_drop.push(std::mem::replace(&mut *sb, retained));
509 }
510
511 Reclaimer::instance().drop(to_drop);
513 }
514
515 Ok(())
516 }
517
518 async fn load_diff(&self, block_id: &BlockId) -> Result<Option<QueueDiffStuff>> {
519 let _histogram = HistogramGuard::begin("tycho_collator_state_load_queue_diff_time");
520
521 tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Load queue diff: {}", block_id.as_short_id());
522
523 let handle_storage = self.storage.block_handle_storage();
524 let block_storage = self.storage.block_storage();
525
526 match handle_storage.load_handle(block_id) {
527 Some(handle) if handle.has_queue_diff() => {
528 block_storage.load_queue_diff(&handle).await.map(Some)
529 }
530 _ => Ok(None),
531 }
532 }
533
534 fn set_sync_context(&self, sync_context: CollatorSyncContext) {
535 self.sync_context_tx.send_if_modified(|curr| {
536 if *curr != sync_context {
537 *curr = sync_context;
538 true
539 } else {
540 false
541 }
542 });
543 }
544
545 fn load_init_block_id(&self) -> Option<BlockId> {
546 self.storage.node_state().load_init_mc_block_id()
547 }
548
549 fn zerostate_id(&self) -> &ZerostateId {
550 &self.zerostate_id
551 }
552
553 fn shard_split_depth(&self) -> u8 {
554 self.storage.config().shard_split_depth
555 }
556}
557
558impl StateNodeAdapterStdImpl {
559 async fn wait_for_block_ext(
560 &self,
561 block_id: BlockIdToWait<'_>,
562 ) -> Option<Result<BlockStuffAug>> {
563 let mut receiver = self.broadcaster.subscribe();
564 loop {
565 if let Some(shard_blocks) = self.blocks.get(&block_id.shard()) {
566 let block = shard_blocks.get(&block_id.seqno()).cloned();
567 drop(shard_blocks);
568
569 if let Some(block) = block {
570 return match self.save_block_proof(&block).await {
571 Ok(_) => Some(Ok(block.block_stuff_aug.clone())),
572 Err(e) => Some(Err(anyhow!("failed to save block proof: {e:?}"))),
573 };
574 }
575 }
576
577 loop {
578 match receiver.recv().await {
579 Ok(received_block_id) if block_id == received_block_id => {
580 break;
581 }
582 Ok(_) => {}
583 Err(broadcast::error::RecvError::Lagged(count)) => {
584 tracing::warn!(target: tracing_targets::STATE_NODE_ADAPTER, "Broadcast channel lagged: {}", count);
585 }
586 Err(broadcast::error::RecvError::Closed) => {
587 tracing::error!(target: tracing_targets::STATE_NODE_ADAPTER, "Broadcast channel closed");
588 return None;
589 }
590 }
591 }
592 }
593 }
594
595 async fn save_block_proof(&self, block: &Arc<BlockStuffForSync>) -> Result<()> {
596 let (block_info, archive_data) = rayon_run({
597 let block = block.clone();
598 move || {
599 let PreparedProof { proof, block_info } = prepare_block_proof(
600 &block.block_stuff_aug.data,
601 &block.consensus_info,
602 &block.signatures,
603 block.total_signature_weight,
604 )
605 .unwrap_or_else(|e| {
606 panic!(
607 "failed to prepare block proof for {:?}: {e:?}",
608 block.block_stuff_aug.id()
609 )
610 });
611
612 let block_proof_stuff = BlockProofStuff::from_proof(proof);
613
614 let proof_boc = BocRepr::encode_rayon(block_proof_stuff.as_ref())
615 .expect("valid block proof must be successfully serialized");
616 let archive_data = block_proof_stuff.with_archive_data(proof_boc);
617
618 (block_info, archive_data)
619 }
620 })
621 .await;
622
623 let _histogram =
624 HistogramGuard::begin("tycho_collator_state_adapter_save_block_proof_time_high");
625
626 let block_storage = self.storage.block_storage();
627 let result = block_storage
628 .store_block_proof(
629 &archive_data,
630 MaybeExistingHandle::New(NewBlockMeta {
631 is_key_block: block_info.key_block,
632 gen_utime: block_info.gen_utime,
633 ref_by_mc_seqno: block.ref_by_mc_seqno,
634 }),
635 )
636 .await?;
637 let is_new_proof = result.new;
638 let is_proof_updated = result.updated;
639
640 let result = block_storage
641 .store_queue_diff(&block.queue_diff_aug, result.handle.into())
642 .await?;
643 let is_new_diff = result.new;
644 let is_diff_updated = result.updated;
645
646 tracing::info!(
647 block_id = %result.handle.id(),
648 is_new_proof,
649 is_proof_updated,
650 is_new_diff,
651 is_diff_updated,
652 "block saved",
653 );
654
655 Ok(())
656 }
657}
658
659#[derive(Clone)]
660struct ShardBlockData {
661 prev_id: BlockId,
662 #[expect(unused)] ref_by_mc_seqno: u32,
664 state_update: MerkleUpdate,
665 _prev_id_alt: Option<BlockId>, }
667
668#[derive(Clone)]
669struct DelayedStateContext {
670 pub mc_block_id: BlockId,
671 pub state: ShardStateStuff,
672 pub is_external: bool,
673 pub sync_context: CollatorSyncContext,
674}
675
676#[derive(Default, Clone)]
677struct DelayedStateNotifier {
678 inner: Arc<Mutex<Option<DelayedStateContext>>>,
679}
680impl DelayedStateNotifier {
681 pub async fn send_delayed_if<F>(
682 &self,
683 listener: Arc<dyn StateNodeEventListener>,
684 check_should_send: F,
685 ) -> Result<()>
686 where
687 F: Fn(CollatorSyncContext) -> bool,
688 {
689 let state_cx = {
690 let mut guard = self.inner.lock();
691 match guard.as_ref() {
692 Some(state_cx) if check_should_send(state_cx.sync_context) => guard.take(),
693 _ => None,
694 }
695 };
696
697 Self::send_impl(listener, state_cx).await
701 }
702
703 pub async fn send_or_delay<F>(
704 &self,
705 listener: Arc<dyn StateNodeEventListener>,
706 mc_block_id: &BlockId,
707 state: ShardStateStuff,
708 is_external: bool,
709 sync_context: CollatorSyncContext,
710 check_should_send: F,
711 ) -> Result<()>
712 where
713 F: Fn(CollatorSyncContext) -> bool,
714 {
715 let state_cx = DelayedStateContext {
716 mc_block_id: *mc_block_id,
717 state,
718 is_external,
719 sync_context,
720 };
721
722 let state_cx = {
723 let mut guard = self.inner.lock();
724 if check_should_send(state_cx.sync_context) {
725 guard.take();
726 Some(state_cx)
727 } else {
728 guard.replace(state_cx);
729 None
730 }
731 };
732
733 Self::send_impl(listener, state_cx).await
736 }
737
738 async fn send_impl(
739 listener: Arc<dyn StateNodeEventListener>,
740 state_cx: Option<DelayedStateContext>,
741 ) -> Result<()> {
742 let Some(DelayedStateContext {
743 mc_block_id,
744 state,
745 is_external,
746 ..
747 }) = state_cx
748 else {
749 return Ok(());
750 };
751
752 if is_external {
753 tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "handle_state: handled external: {}", state.block_id());
754 listener
755 .on_block_accepted_external(&mc_block_id, &state)
756 .await
757 } else {
758 tracing::info!(target: tracing_targets::STATE_NODE_ADAPTER, "handle_state: handled own: {}", state.block_id());
759 listener.on_block_accepted(&state).await
760 }
761 }
762}
763
764#[expect(
765 clippy::disallowed_methods,
766 reason = "We are working with a virtual block here, so `load_extra` and other methods are necessary"
767)]
768fn prepare_block_proof(
769 block_stuff: &BlockStuff,
770 consensus_info: &ConsensusInfo,
771 signatures: &FastHashMap<PeerId, ArcSignature>,
772 total_signature_weight: u64,
773) -> Result<PreparedProof> {
774 let _histogram = HistogramGuard::begin("tycho_collator_state_adapter_prepare_block_proof_time");
775
776 let usage_tree = UsageTree::new(UsageTreeMode::OnLoad);
777 let tracked_cell = usage_tree.track(block_stuff.root_cell());
778 let block = tracked_cell.parse::<Block>()?;
779 block.value_flow.inner().as_ref().touch_recursive();
780
781 let block_info = block.load_info()?;
782
783 block_info.load_prev_ref()?;
784 block_info.prev_vert_ref.as_ref().map(|x| x.load());
785 block_info.master_ref.as_ref().map(|x| x.load());
786
787 let _state_update = block.load_state_update();
791
792 if let Some(custom) = block.load_extra()?.load_custom()? {
793 if let Some(root) = custom.shards.as_dict().root() {
795 root.touch_recursive();
796 }
797
798 if let Some(config) = &custom.config {
799 config.get::<ConfigParam28>()?;
801 for param_id in 32..=38 {
803 if let Some(mut vset) = config.get_raw(param_id)? {
804 ValidatorSet::load_from(&mut vset)?;
805 }
806 }
807 }
808 }
809
810 let merkle_proof = MerkleProof::create(block_stuff.root_cell().as_ref(), usage_tree).build()?;
811
812 let root = CellBuilder::build_from(merkle_proof)?;
813
814 let signatures = if block_stuff.id().is_masterchain() {
815 Some(process_signatures(
816 block_info.gen_validator_list_hash_short,
817 block_info.gen_catchain_seqno,
818 total_signature_weight,
819 consensus_info,
820 signatures,
821 )?)
822 } else {
823 None
824 };
825
826 Ok(PreparedProof {
827 proof: Box::new(BlockProof {
828 proof_for: *block_stuff.id(),
829 root,
830 signatures,
831 }),
832 block_info,
833 })
834}
835
836fn process_signatures(
837 gen_validator_list_hash_short: u32,
838 gen_session_seqno: u32,
839 total_weight: u64,
840 consensus_info: &ConsensusInfo,
841 block_signatures: &FastHashMap<PeerId, ArcSignature>,
842) -> Result<tycho_types::models::block::BlockSignatures> {
843 use tycho_types::dict;
844
845 let signatures = Dict::from_raw(dict::build_dict_from_sorted_iter(
847 block_signatures
848 .iter()
849 .enumerate()
850 .map(|(i, (key, value))| {
851 let key_hash = tl_proto::hash(tycho_crypto::tl::PublicKey::Ed25519 {
852 key: key.as_bytes(),
853 });
854
855 (i as u16, BlockSignature {
856 node_id_short: key_hash.into(),
857 signature: Signature(*value.as_ref()),
858 })
859 }),
860 Cell::empty_context(),
861 )?);
862
863 Ok(tycho_types::models::block::BlockSignatures {
864 validator_info: ValidatorBaseInfo {
865 validator_list_hash_short: gen_validator_list_hash_short,
866 catchain_seqno: gen_session_seqno,
867 },
868 consensus_info: *consensus_info,
869 signature_count: block_signatures.len() as u32,
870 total_weight,
871 signatures,
872 })
873}
874
875#[derive(Debug, Copy, Clone, PartialEq, Eq)]
876pub enum CollatorSyncContext {
877 Persistent,
878 Historical,
879 Recent,
880}
881
882struct PreparedProof {
883 proof: Box<BlockProof>,
884 block_info: BlockInfo,
885}
886
887enum BlockIdToWait<'a> {
888 Short(&'a BlockIdShort),
889 Full(&'a BlockId),
890}
891
892impl BlockIdToWait<'_> {
893 fn shard(&self) -> ShardIdent {
894 match self {
895 Self::Short(id) => id.shard,
896 Self::Full(id) => id.shard,
897 }
898 }
899
900 fn seqno(&self) -> u32 {
901 match self {
902 Self::Short(id) => id.seqno,
903 Self::Full(id) => id.seqno,
904 }
905 }
906}
907
908impl PartialEq<BlockId> for BlockIdToWait<'_> {
909 fn eq(&self, other: &BlockId) -> bool {
910 match *self {
911 BlockIdToWait::Short(short) => &other.as_short_id() == short,
912 BlockIdToWait::Full(full) => full == other,
913 }
914 }
915}