Skip to main content

tycho_collator/mempool/impls/single_node_impl/
mod.rs

1mod adapter_impl;
2mod anchor_handler;
3
4use std::sync::Arc;
5
6use anyhow::Result;
7use tokio::sync::Mutex;
8use tokio::time::MissedTickBehavior;
9use tycho_consensus::prelude::*;
10use tycho_network::PeerId;
11use tycho_util::futures::JoinTask;
12
13use crate::mempool::StateUpdateContext;
14use crate::mempool::impls::common::cache::Cache;
15use crate::mempool::impls::common::v_set_adapter::VSetAdapter;
16use crate::mempool::impls::single_node_impl::anchor_handler::SingleNodeAnchorHandler;
17use crate::tracing_targets;
18
19pub struct MempoolAdapterSingleNodeImpl {
20    cache: Arc<Cache>,
21    config: Mutex<SingleNodeConfigAdapter>,
22    local_peer_id: PeerId,
23
24    input_buffer: InputBuffer,
25}
26
27struct SingleNodeConfigAdapter {
28    builder: MempoolConfigBuilder,
29    inner_process: Option<JoinTask<()>>,
30}
31
32impl MempoolAdapterSingleNodeImpl {
33    pub fn new(mempool_node_config: &MempoolNodeConfig, local_peer_id: PeerId) -> Result<Self> {
34        let config_builder = MempoolConfigBuilder::new(mempool_node_config);
35
36        Ok(Self {
37            cache: Default::default(),
38            config: Mutex::new(SingleNodeConfigAdapter {
39                builder: config_builder,
40                inner_process: None,
41            }),
42            local_peer_id,
43            input_buffer: InputBuffer::default(),
44        })
45    }
46}
47
48impl MempoolAdapterSingleNodeImpl {
49    fn process_start(
50        &self,
51        config_guard: &mut SingleNodeConfigAdapter,
52        ctx: &StateUpdateContext,
53    ) -> Result<()> {
54        let merged_conf = config_guard.builder.build()?;
55
56        self.input_buffer.apply_config(&merged_conf.conf.consensus);
57
58        let timeout =
59            merged_conf.conf.consensus.min_round_duration_millis().get() * WAVE_ROUNDS as u64;
60
61        let mut interval = tokio::time::interval(std::time::Duration::from_millis(timeout));
62        interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
63
64        let input_buffer = self.input_buffer.clone();
65
66        let mut anchor_handler = SingleNodeAnchorHandler::new(
67            self.cache.clone(),
68            self.local_peer_id,
69            ctx.top_processed_to_anchor_id,
70            &merged_conf.conf.consensus,
71        );
72
73        let v_set_len = VSetAdapter::init_peers(ctx)?.curr_v_set.len();
74        if v_set_len == 2 {
75            anyhow::bail!("cannot run mempool with 2 nodes, gen network with either 1 or 3 nodes");
76        } else if v_set_len > 2 {
77            anyhow::bail!("cannot run {v_set_len} nodes with `single-node` cli flag");
78        };
79
80        config_guard.inner_process = Some(JoinTask::new(async move {
81            scopeguard::defer!(tracing::warn!(
82                target: tracing_targets::MEMPOOL_ADAPTER,
83                "Single node Mempool task stopped"
84            ));
85
86            loop {
87                interval.tick().await;
88
89                let external_messages = input_buffer.fetch(true);
90
91                anchor_handler = anchor_handler.handle(external_messages).await;
92            }
93        }));
94
95        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Single node Mempool task started");
96
97        Ok(())
98    }
99}