Skip to main content

tycho_collator/mempool/impls/std_impl/
adapter_impl.rs

1use anyhow::Result;
2use bytes::Bytes;
3use tycho_types::models::{ConsensusConfig, GenesisInfo};
4
5use crate::mempool::{
6    GetAnchorResult, MempoolAdapter, MempoolAdapterStdImpl, MempoolAnchorId, StateUpdateContext,
7};
8use crate::tracing_targets;
9use crate::types::processed_upto::BlockSeqno;
10
11#[async_trait::async_trait]
12impl MempoolAdapter for MempoolAdapterStdImpl {
13    async fn handle_mc_state_update(&self, new_cx: Box<StateUpdateContext>) -> Result<()> {
14        tracing::debug!(
15            target: tracing_targets::MEMPOOL_ADAPTER,
16            tka = %new_cx.top_processed_to_anchor_id,
17            "Received state update from mc block",
18        );
19        assert!(new_cx.mc_block_id.is_masterchain(), "expect only MC data");
20
21        // assume first block versions are monotonic by both top anchor and seqno
22        // and there may be a second block version out of particular order,
23        // but strictly before `handle_top_processed_to_anchor()` is called;
24        // handle_top_processed_to_anchor() is called with monotonically increasing anchors
25        let mut keeper_guard = self.keeper.lock().await;
26
27        // collator won't receive any anchors since the prepare until the block gets signed
28        keeper_guard.check_expect_genesis_change(&self.cache, &new_cx)?;
29
30        if let Some(ctx) = keeper_guard.state_update_queue.push(new_cx)? {
31            self.process_state_update(&mut keeper_guard, &ctx).await?;
32            (self.top_known_anchor).set_max_raw(ctx.top_processed_to_anchor_id);
33            keeper_guard.last_state_update = Some(*ctx);
34        }
35
36        Ok(())
37    }
38
39    async fn handle_signed_mc_block(&self, mc_block_seqno: BlockSeqno) -> Result<()> {
40        let mut keeper_guard = self.keeper.lock().await;
41
42        for ctx in keeper_guard.state_update_queue.signed(mc_block_seqno)? {
43            self.process_state_update(&mut keeper_guard, &ctx).await?;
44            (self.top_known_anchor).set_max_raw(ctx.top_processed_to_anchor_id);
45            keeper_guard.last_state_update = Some(*ctx);
46        }
47        Ok(())
48    }
49
50    async fn get_anchor_by_id(&self, anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
51        tracing::debug!(
52            target: tracing_targets::MEMPOOL_ADAPTER,
53            %anchor_id,
54            "get_anchor_by_id"
55        );
56
57        let result = match self.cache.get_anchor_by_id(anchor_id).await {
58            Some(anchor) => GetAnchorResult::Exist(anchor),
59            None => GetAnchorResult::NotExist,
60        };
61
62        Ok(result)
63    }
64
65    async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<GetAnchorResult> {
66        tracing::debug!(
67            target: tracing_targets::MEMPOOL_ADAPTER,
68            %prev_anchor_id,
69            "get_next_anchor"
70        );
71
72        let result = match self.cache.get_next_anchor(prev_anchor_id).await? {
73            Some(anchor) => GetAnchorResult::Exist(anchor),
74            None => GetAnchorResult::NotExist,
75        };
76
77        Ok(result)
78    }
79
80    fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()> {
81        self.cache.clear(before_anchor_id);
82        Ok(())
83    }
84
85    fn accept_external(&self, message: Bytes) {
86        self.input_buffer.push(message);
87    }
88
89    async fn update_delayed_config(
90        &self,
91        consensus_config: Option<&ConsensusConfig>,
92        genesis_info: &GenesisInfo,
93    ) -> Result<()> {
94        let mut keeper_guard = self.keeper.lock().await;
95        if let Some(consensus_config) = consensus_config {
96            (keeper_guard.config_builder).set_consensus_config(consensus_config)?;
97        } // else: will be set from mc state after sync
98
99        keeper_guard.config_builder.set_genesis(*genesis_info);
100        Ok(())
101    }
102}