tycho_collator/mempool/impls/std_impl/
mod.rs1mod adapter_impl;
2mod anchor_handler;
3mod session_keeper;
4mod state_update_queue;
5
6use std::sync::Arc;
7
8use anyhow::Result;
9use futures_util::FutureExt;
10use tokio::sync::{Mutex, mpsc, oneshot};
11use tracing::Instrument;
12use tycho_consensus::prelude::*;
13use tycho_crypto::ed25519::KeyPair;
14use tycho_network::{Network, OverlayService, PeerResolver};
15
16use crate::mempool::StateUpdateContext;
17use crate::mempool::impls::common::cache::Cache;
18use crate::mempool::impls::common::v_set_adapter::VSetAdapter;
19use crate::mempool::impls::std_impl::anchor_handler::StdAnchorHandler;
20use crate::mempool::impls::std_impl::session_keeper::StdSessionKeeper;
21use crate::tracing_targets;
22
23pub struct MempoolAdapterStdImpl {
24 cache: Arc<Cache>,
25 net_args: EngineNetworkArgs,
26
27 keeper: Mutex<StdSessionKeeper>,
28
29 mempool_db: Arc<MempoolDb>,
30 input_buffer: InputBuffer,
31 top_known_anchor: RoundWatch<TopKnownAnchor>,
32}
33
34impl MempoolAdapterStdImpl {
35 pub fn new(
36 key_pair: Arc<KeyPair>,
37 network: &Network,
38 peer_resolver: &PeerResolver,
39 overlay_service: &OverlayService,
40 mempool_db: Arc<MempoolDb>,
41 moderator: Moderator,
42 mempool_node_config: &MempoolNodeConfig,
43 ) -> Result<Self> {
44 Ok(Self {
45 cache: Default::default(),
46 net_args: EngineNetworkArgs {
47 key_pair,
48 network: network.clone(),
49 peer_resolver: peer_resolver.clone(),
50 overlay_service: overlay_service.clone(),
51 moderator,
52 },
53 keeper: Mutex::new(StdSessionKeeper::new(mempool_node_config)),
54 mempool_db,
55 input_buffer: InputBuffer::default(),
56 top_known_anchor: RoundWatch::default(),
57 })
58 }
59
60 async fn process_state_update(
61 &self,
62 keeper: &mut StdSessionKeeper,
63 new_cx: &StateUpdateContext,
64 ) -> Result<()> {
65 let span = tracing::error_span!("tka", seq_no = new_cx.top_processed_to_anchor_id);
67
68 let has_session_after_update = keeper.has_session_after_update(&self.cache, new_cx);
69
70 if !has_session_after_update.instrument(span.clone()).await? {
71 (self.net_args.moderator.wait_init().instrument(span.clone())).await?;
72
73 let _guard = span.enter();
74 let merged_config = keeper.config_builder.build()?;
75 keeper.engine_session = Some(self.start(&merged_config, new_cx)?);
76 }
77 Ok(())
78 }
79
80 fn start(
82 &self,
83 merged_conf: &MempoolMergedConfig,
84 ctx: &StateUpdateContext,
85 ) -> Result<EngineSession> {
86 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Starting mempool engine...");
87
88 let (anchors_tx, anchors_rx) = mpsc::unbounded_channel();
89
90 self.input_buffer.apply_config(&merged_conf.conf.consensus);
91
92 self.top_known_anchor
94 .set_max_raw(ctx.top_processed_to_anchor_id);
95
96 let commit_finished = RoundWatch::default();
97
98 let bind = EngineBinding {
99 mempool_db: self.mempool_db.clone(),
100 input_buffer: self.input_buffer.clone(),
101 top_known_anchor: self.top_known_anchor.clone(),
102 commit_finished: commit_finished.clone(),
103 anchors_tx,
104 };
105
106 let estimated_sync_bottom = ctx
108 .top_processed_to_anchor_id
109 .saturating_sub(merged_conf.conf.consensus.reset_rounds())
110 .max(merged_conf.genesis_info.start_round);
111 anyhow::ensure!(
112 estimated_sync_bottom >= ctx.consensus_info.prev_vset_switch_round,
113 "cannot start from outdated peer sets (too short mempool epoch(s)): \
114 estimated sync bottom {estimated_sync_bottom} \
115 is older than prev vset switch round {}; \
116 start round {}, top processed to anchor {} in block {}",
117 ctx.consensus_info.prev_vset_switch_round,
118 merged_conf.genesis_info.start_round,
119 ctx.top_processed_to_anchor_id,
120 ctx.mc_block_id,
121 );
122
123 let init_peers = VSetAdapter::init_peers(ctx)?;
124 if init_peers.curr_v_set.len() == 1 {
125 anyhow::bail!("pass `single-node` cli flag to run network of 1 node");
126 } else if init_peers.curr_v_set.len() == 2 {
127 anyhow::bail!("cannot run mempool with 2 nodes, gen network with either 1 or 3 nodes");
128 };
129
130 let (engine_stop_tx, mut engine_stop_rx) = oneshot::channel();
131 let session = EngineSession::new(
132 bind,
133 &self.net_args,
134 merged_conf,
135 init_peers,
136 engine_stop_tx,
137 );
138
139 let mut anchors_task = StdAnchorHandler::new(
140 anchors_rx,
141 commit_finished,
142 &self.cache,
143 &merged_conf.conf.consensus,
144 )
145 .run(self.mempool_db.clone())
146 .boxed();
147
148 tokio::spawn(async move {
149 tokio::select! {
150 handler_result = &mut anchors_task => match handler_result {
151 Err(error) => tracing::warn!(
152 target: tracing_targets::MEMPOOL_ADAPTER,
153 "Mempool anchor handler stopped: {error}"
154 )
155 },
156 engine_result = &mut engine_stop_rx => match engine_result {
157 Ok(()) => tracing::info!(
158 target: tracing_targets::MEMPOOL_ADAPTER,
159 "Mempool main task is stopped: some subtask was cancelled"
160 ),
161 Err(_recv_error) => tracing::info!(
162 target: tracing_targets::MEMPOOL_ADAPTER,
163 "Mempool main task is cancelled"
164 ),
165 },
166 }
167 });
168
169 tracing::info!(target: tracing_targets::MEMPOOL_ADAPTER, "Mempool started");
170
171 Ok(session)
172 }
173}