tycho_collator/mempool/impls/single_node_impl/
adapter_impl.rs1use anyhow::Result;
2use bytes::Bytes;
3use tycho_types::models::{ConsensusConfig, GenesisInfo};
4
5use crate::mempool::{
6 GetAnchorResult, MempoolAdapter, MempoolAdapterSingleNodeImpl, MempoolAnchorId,
7 StateUpdateContext,
8};
9use crate::tracing_targets;
10use crate::types::processed_upto::BlockSeqno;
11
12#[async_trait::async_trait]
13impl MempoolAdapter for MempoolAdapterSingleNodeImpl {
14 async fn handle_mc_state_update(&self, new_cx: Box<StateUpdateContext>) -> Result<()> {
15 let mut config_guard = self.config.lock().await;
16
17 tracing::debug!(
18 target: tracing_targets::MEMPOOL_ADAPTER,
19 full_id = %new_cx.mc_block_id,
20 "Received state update from mc block",
21 );
22
23 let cfg = &new_cx.consensus_config;
24 tracing::info!(
25 target: tracing_targets::MEMPOOL_ADAPTER,
26 "handle_mc_state_update: consensus config={:?}", cfg
27 );
28
29 (config_guard.builder).set_genesis(new_cx.consensus_info.genesis_info);
32 (config_guard.builder).set_consensus_config(&new_cx.consensus_config)?;
33
34 if config_guard.inner_process.is_none() {
35 self.process_start(&mut config_guard, &new_cx)?;
36 }
37
38 Ok(())
39 }
40
41 async fn handle_signed_mc_block(&self, mc_block_seqno: BlockSeqno) -> Result<()> {
42 let _span = tracing::error_span!("mc_state_update", seq_no = mc_block_seqno).entered();
43 tracing::debug!(
44 target: tracing_targets::MEMPOOL_ADAPTER,
45 "call handle_signed_mc_block"
46 );
47
48 Ok(())
49 }
50
51 async fn get_anchor_by_id(&self, anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
52 tracing::debug!(
53 target: tracing_targets::MEMPOOL_ADAPTER,
54 %anchor_id,
55 "get_anchor_by_id"
56 );
57
58 let result = match self.cache.get_anchor_by_id(anchor_id).await {
59 Some(anchor) => GetAnchorResult::Exist(anchor),
60 None => GetAnchorResult::NotExist,
61 };
62
63 Ok(result)
64 }
65
66 async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
67 tracing::debug!(
68 target: tracing_targets::MEMPOOL_ADAPTER,
69 %prev_anchor_id,
70 "get_next_anchor"
71 );
72
73 let result = match self.cache.get_next_anchor(prev_anchor_id).await? {
74 Some(anchor) => GetAnchorResult::Exist(anchor),
75 None => GetAnchorResult::NotExist,
76 };
77
78 Ok(result)
79 }
80
81 fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
82 self.cache.clear(before_anchor_id);
83 Ok(())
84 }
85
86 fn accept_external(&self, message: Bytes) {
87 self.input_buffer.push(message);
88 }
89
90 async fn update_delayed_config(
91 &self,
92 consensus_config: Option<&ConsensusConfig>,
93 genesis_info: &GenesisInfo,
94 ) -> Result<()> {
95 let mut config_guard = self.config.lock().await;
96 if let Some(consensus_config) = consensus_config {
97 (config_guard.builder).set_consensus_config(consensus_config)?;
98 } config_guard.builder.set_genesis(*genesis_info);
101 Ok(())
102 }
103}