Skip to main content

tycho_collator/mempool/impls/single_node_impl/
adapter_impl.rs

1use anyhow::Result;
2use bytes::Bytes;
3use tycho_types::models::{ConsensusConfig, GenesisInfo};
4
5use crate::mempool::{
6    GetAnchorResult, MempoolAdapter, MempoolAdapterSingleNodeImpl, MempoolAnchorId,
7    StateUpdateContext,
8};
9use crate::tracing_targets;
10use crate::types::processed_upto::BlockSeqno;
11
12#[async_trait::async_trait]
13impl MempoolAdapter for MempoolAdapterSingleNodeImpl {
14    async fn handle_mc_state_update(&self, new_cx: Box<StateUpdateContext>) -> Result<()> {
15        let mut config_guard = self.config.lock().await;
16
17        tracing::debug!(
18            target: tracing_targets::MEMPOOL_ADAPTER,
19            full_id = %new_cx.mc_block_id,
20            "Received state update from mc block",
21        );
22
23        let cfg = &new_cx.consensus_config;
24        tracing::info!(
25            target: tracing_targets::MEMPOOL_ADAPTER,
26            "handle_mc_state_update: consensus config={:?}", cfg
27        );
28
29        // we don't use state update queue and assume every block is signed by ourselves
30
31        (config_guard.builder).set_genesis(new_cx.consensus_info.genesis_info);
32        (config_guard.builder).set_consensus_config(&new_cx.consensus_config)?;
33
34        if config_guard.inner_process.is_none() {
35            self.process_start(&mut config_guard, &new_cx)?;
36        }
37
38        Ok(())
39    }
40
41    async fn handle_signed_mc_block(&self, mc_block_seqno: BlockSeqno) -> Result<()> {
42        let _span = tracing::error_span!("mc_state_update", seq_no = mc_block_seqno).entered();
43        tracing::debug!(
44            target: tracing_targets::MEMPOOL_ADAPTER,
45            "call handle_signed_mc_block"
46        );
47
48        Ok(())
49    }
50
51    async fn get_anchor_by_id(&self, anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
52        tracing::debug!(
53            target: tracing_targets::MEMPOOL_ADAPTER,
54            %anchor_id,
55            "get_anchor_by_id"
56        );
57
58        let result = match self.cache.get_anchor_by_id(anchor_id).await {
59            Some(anchor) => GetAnchorResult::Exist(anchor),
60            None => GetAnchorResult::NotExist,
61        };
62
63        Ok(result)
64    }
65
66    async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
67        tracing::debug!(
68            target: tracing_targets::MEMPOOL_ADAPTER,
69            %prev_anchor_id,
70            "get_next_anchor"
71        );
72
73        let result = match self.cache.get_next_anchor(prev_anchor_id).await? {
74            Some(anchor) => GetAnchorResult::Exist(anchor),
75            None => GetAnchorResult::NotExist,
76        };
77
78        Ok(result)
79    }
80
81    fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
82        self.cache.clear(before_anchor_id);
83        Ok(())
84    }
85
86    fn accept_external(&self, message: Bytes) {
87        self.input_buffer.push(message);
88    }
89
90    async fn update_delayed_config(
91        &self,
92        consensus_config: Option<&ConsensusConfig>,
93        genesis_info: &GenesisInfo,
94    ) -> Result<()> {
95        let mut config_guard = self.config.lock().await;
96        if let Some(consensus_config) = consensus_config {
97            (config_guard.builder).set_consensus_config(consensus_config)?;
98        } // else: will be set from mc state after sync
99
100        config_guard.builder.set_genesis(*genesis_info);
101        Ok(())
102    }
103}