tycho_collator/mempool/impls/std_impl/
adapter_impl.rs1use anyhow::Result;
2use bytes::Bytes;
3use tycho_types::models::{ConsensusConfig, GenesisInfo};
4
5use crate::mempool::{
6 GetAnchorResult, MempoolAdapter, MempoolAdapterStdImpl, MempoolAnchorId, StateUpdateContext,
7};
8use crate::tracing_targets;
9use crate::types::processed_upto::BlockSeqno;
10
11#[async_trait::async_trait]
12impl MempoolAdapter for MempoolAdapterStdImpl {
13 async fn handle_mc_state_update(&self, new_cx: Box<StateUpdateContext>) -> Result<()> {
14 tracing::debug!(
15 target: tracing_targets::MEMPOOL_ADAPTER,
16 tka = %new_cx.top_processed_to_anchor_id,
17 "Received state update from mc block",
18 );
19 assert!(new_cx.mc_block_id.is_masterchain(), "expect only MC data");
20
21 let mut keeper_guard = self.keeper.lock().await;
26
27 keeper_guard.check_expect_genesis_change(&self.cache, &new_cx)?;
29
30 if let Some(ctx) = keeper_guard.state_update_queue.push(new_cx)? {
31 self.process_state_update(&mut keeper_guard, &ctx).await?;
32 (self.top_known_anchor).set_max_raw(ctx.top_processed_to_anchor_id);
33 keeper_guard.last_state_update = Some(*ctx);
34 }
35
36 Ok(())
37 }
38
39 async fn handle_signed_mc_block(&self, mc_block_seqno: BlockSeqno) -> Result<()> {
40 let mut keeper_guard = self.keeper.lock().await;
41
42 for ctx in keeper_guard.state_update_queue.signed(mc_block_seqno)? {
43 self.process_state_update(&mut keeper_guard, &ctx).await?;
44 (self.top_known_anchor).set_max_raw(ctx.top_processed_to_anchor_id);
45 keeper_guard.last_state_update = Some(*ctx);
46 }
47 Ok(())
48 }
49
50 async fn get_anchor_by_id(&self, anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
51 tracing::debug!(
52 target: tracing_targets::MEMPOOL_ADAPTER,
53 %anchor_id,
54 "get_anchor_by_id"
55 );
56
57 let result = match self.cache.get_anchor_by_id(anchor_id).await {
58 Some(anchor) => GetAnchorResult::Exist(anchor),
59 None => GetAnchorResult::NotExist,
60 };
61
62 Ok(result)
63 }
64
65 async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
66 tracing::debug!(
67 target: tracing_targets::MEMPOOL_ADAPTER,
68 %prev_anchor_id,
69 "get_next_anchor"
70 );
71
72 let result = match self.cache.get_next_anchor(prev_anchor_id).await? {
73 Some(anchor) => GetAnchorResult::Exist(anchor),
74 None => GetAnchorResult::NotExist,
75 };
76
77 Ok(result)
78 }
79
80 fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
81 self.cache.clear(before_anchor_id);
82 Ok(())
83 }
84
85 fn accept_external(&self, message: Bytes) {
86 self.input_buffer.push(message);
87 }
88
89 async fn update_delayed_config(
90 &self,
91 consensus_config: Option<&ConsensusConfig>,
92 genesis_info: &GenesisInfo,
93 ) -> Result<()> {
94 let mut keeper_guard = self.keeper.lock().await;
95 if let Some(consensus_config) = consensus_config {
96 (keeper_guard.config_builder).set_consensus_config(consensus_config)?;
97 } keeper_guard.config_builder.set_genesis(*genesis_info);
100 Ok(())
101 }
102}