Skip to main content

tycho_collator/mempool/impls/std_impl/
mod.rs

1mod adapter_impl;
2mod anchor_handler;
3mod session_keeper;
4mod state_update_queue;
5
6use std::sync::Arc;
7
8use anyhow::Result;
9use futures_util::FutureExt;
10use tokio::sync::{Mutex, mpsc, oneshot};
11use tracing::Instrument;
12use tycho_consensus::prelude::*;
13use tycho_crypto::ed25519::KeyPair;
14use tycho_network::{Network, OverlayService, PeerResolver};
15
16use crate::mempool::StateUpdateContext;
17use crate::mempool::impls::common::cache::Cache;
18use crate::mempool::impls::common::v_set_adapter::VSetAdapter;
19use crate::mempool::impls::std_impl::anchor_handler::StdAnchorHandler;
20use crate::mempool::impls::std_impl::session_keeper::StdSessionKeeper;
21use crate::tracing_targets;
22
23pub struct MempoolAdapterStdImpl {
24    cache: Arc<Cache>,
25    net_args: EngineNetworkArgs,
26
27    keeper: Mutex<StdSessionKeeper>,
28
29    mempool_db: Arc<MempoolDb>,
30    input_buffer: InputBuffer,
31    top_known_anchor: RoundWatch<TopKnownAnchor>,
32}
33
34impl MempoolAdapterStdImpl {
35    pub fn new(
36        key_pair: Arc<KeyPair>,
37        network: &Network,
38        peer_resolver: &PeerResolver,
39        overlay_service: &OverlayService,
40        mempool_db: Arc<MempoolDb>,
41        moderator: Moderator,
42        mempool_node_config: &MempoolNodeConfig,
43    ) -> Result<Self> {
44        Ok(Self {
45            cache: Default::default(),
46            net_args: EngineNetworkArgs {
47                key_pair,
48                network: network.clone(),
49                peer_resolver: peer_resolver.clone(),
50                overlay_service: overlay_service.clone(),
51                moderator,
52            },
53            keeper: Mutex::new(StdSessionKeeper::new(mempool_node_config)),
54            mempool_db,
55            input_buffer: InputBuffer::default(),
56            top_known_anchor: RoundWatch::default(),
57        })
58    }
59
60    async fn process_state_update(
61        &self,
62        keeper: &mut StdSessionKeeper,
63        new_cx: &StateUpdateContext,
64    ) -> Result<()> {
65        // method is called in a for-cycle, so `seq_no` may differ
66        let span = tracing::error_span!("tka", seq_no = new_cx.top_processed_to_anchor_id);
67
68        let has_session_after_update = keeper.has_session_after_update(&self.cache, new_cx);
69
70        if !has_session_after_update.instrument(span.clone()).await? {
71            (self.net_args.moderator.wait_init().instrument(span.clone())).await?;
72
73            let _guard = span.enter();
74            let merged_config = keeper.config_builder.build()?;
75            keeper.engine_session = Some(self.start(&merged_config, new_cx)?);
76        }
77        Ok(())
78    }
79
80    /// Runs mempool engine session
81    fn start(
82        &self,
83        merged_conf: &MempoolMergedConfig,
84        ctx: &StateUpdateContext,
85    ) -> Result<EngineSession> {
86        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Starting mempool engine...");
87
88        let (anchors_tx, anchors_rx) = mpsc::unbounded_channel();
89
90        self.input_buffer.apply_config(&merged_conf.conf.consensus);
91
92        // Note: mempool is always run from applied mc block
93        self.top_known_anchor
94            .set_max_raw(ctx.top_processed_to_anchor_id);
95
96        let commit_finished = RoundWatch::default();
97
98        let bind = EngineBinding {
99            mempool_db: self.mempool_db.clone(),
100            input_buffer: self.input_buffer.clone(),
101            top_known_anchor: self.top_known_anchor.clone(),
102            commit_finished: commit_finished.clone(),
103            anchors_tx,
104        };
105
106        // actual oldest sync round will be not less than this
107        let estimated_sync_bottom = ctx
108            .top_processed_to_anchor_id
109            .saturating_sub(merged_conf.conf.consensus.reset_rounds())
110            .max(merged_conf.genesis_info.start_round);
111        anyhow::ensure!(
112            estimated_sync_bottom >= ctx.consensus_info.prev_vset_switch_round,
113            "cannot start from outdated peer sets (too short mempool epoch(s)): \
114                 estimated sync bottom {estimated_sync_bottom} \
115                 is older than prev vset switch round {}; \
116                 start round {}, top processed to anchor {} in block {}",
117            ctx.consensus_info.prev_vset_switch_round,
118            merged_conf.genesis_info.start_round,
119            ctx.top_processed_to_anchor_id,
120            ctx.mc_block_id,
121        );
122
123        let init_peers = VSetAdapter::init_peers(ctx)?;
124        if init_peers.curr_v_set.len() == 1 {
125            anyhow::bail!("pass `single-node` cli flag to run network of 1 node");
126        } else if init_peers.curr_v_set.len() == 2 {
127            anyhow::bail!("cannot run mempool with 2 nodes, gen network with either 1 or 3 nodes");
128        };
129
130        let (engine_stop_tx, mut engine_stop_rx) = oneshot::channel();
131        let session = EngineSession::new(
132            bind,
133            &self.net_args,
134            merged_conf,
135            init_peers,
136            engine_stop_tx,
137        );
138
139        let mut anchors_task = StdAnchorHandler::new(
140            anchors_rx,
141            commit_finished,
142            &self.cache,
143            &merged_conf.conf.consensus,
144        )
145        .run(self.mempool_db.clone())
146        .boxed();
147
148        tokio::spawn(async move {
149            tokio::select! {
150                handler_result = &mut anchors_task => match handler_result {
151                    Err(error) => tracing::warn!(
152                        target: tracing_targets::MEMPOOL_ADAPTER,
153                        "Mempool anchor handler stopped: {error}"
154                    )
155                },
156                engine_result = &mut engine_stop_rx => match engine_result {
157                    Ok(()) => tracing::info!(
158                        target: tracing_targets::MEMPOOL_ADAPTER,
159                        "Mempool main task is stopped: some subtask was cancelled"
160                    ),
161                    Err(_recv_error) => tracing::info!(
162                        target: tracing_targets::MEMPOOL_ADAPTER,
163                        "Mempool main task is cancelled"
164                    ),
165                },
166            }
167        });
168
169        tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Mempool started");
170
171        Ok(session)
172    }
173}