Skip to main content

tycho_consensus/engine/
mempool_config.rs

1use std::num::{NonZeroU8, NonZeroU16};
2use std::sync::OnceLock;
3
4use anyhow::{Context, Result, ensure};
5use serde::{Deserialize, Serialize};
6use tycho_crypto::ed25519::{KeyPair, SecretKey};
7use tycho_network::{OverlayId, PeerId};
8use tycho_types::models::{ConsensusConfig, GenesisInfo};
9
10use crate::dag::AnchorStage;
11use crate::models::{Link, Point, PointData, Round, UnixTime};
12
13// replace with `ArcSwapOption` + copy on get() if need to change in runtime
14static NODE_CONFIG: OnceLock<MempoolNodeConfig> = OnceLock::new();
15pub struct NodeConfig;
16impl NodeConfig {
17    pub fn get() -> &'static MempoolNodeConfig {
18        (NODE_CONFIG.get()).expect("mempool node config not initialized")
19    }
20}
21
22/// values that can be changed in runtime via key block, private to crate
23#[derive(Clone, Debug, PartialEq)]
24pub struct MempoolConfig {
25    pub consensus: ConsensusConfig,
26    pub genesis_round: Round,
27    /// Estimated hard limit on serialized point size
28    pub point_max_bytes: usize,
29}
30
31/// visible outside crate and used as DTO
32#[derive(Clone, Debug)]
33pub struct MempoolMergedConfig {
34    pub genesis_info: GenesisInfo,
35    pub conf: MempoolConfig,
36    pub(crate) overlay_id: OverlayId,
37}
38
39impl MempoolMergedConfig {
40    pub(crate) fn genesis_author(&self) -> PeerId {
41        let key_pair = KeyPair::from(&SecretKey::from_bytes(self.overlay_id.0));
42        key_pair.public_key.into()
43    }
44
45    pub(crate) fn genesis(&self) -> Point {
46        let key_pair = KeyPair::from(&SecretKey::from_bytes(self.overlay_id.0));
47        let millis = UnixTime::from_millis(self.genesis_info.genesis_millis);
48        Point::new(
49            &key_pair,
50            PeerId::from(key_pair.public_key),
51            self.conf.genesis_round,
52            Default::default(),
53            PointData {
54                time: millis,
55                includes: Default::default(),
56                witness: Default::default(),
57                evidence: Default::default(),
58                anchor_trigger: Link::ToSelf,
59                anchor_proof: Link::ToSelf,
60                anchor_time: millis,
61            },
62            &self.conf,
63        )
64    }
65}
66
67#[derive(Debug, Clone)]
68pub struct MempoolConfigBuilder {
69    genesis_info: Option<GenesisInfo>,
70    consensus_config: Option<ConsensusConfig>,
71}
72
73impl MempoolConfigBuilder {
74    pub fn new(node_config: &MempoolNodeConfig) -> Self {
75        if NODE_CONFIG.get_or_init(|| node_config.clone()) != node_config {
76            tracing::error!(
77                "mempool node config was not changed; using prev {:?} ignored new {:?}",
78                NodeConfig::get(),
79                node_config,
80            );
81        };
82        Self {
83            genesis_info: None,
84            consensus_config: None,
85        }
86    }
87
88    pub fn set_consensus_config(&mut self, consensus_config: &ConsensusConfig) -> Result<()> {
89        ensure!(
90            consensus_config.max_consensus_lag_rounds
91                >= consensus_config.commit_history_rounds.into(),
92            "max consensus lag must be greater than commit depth"
93        );
94
95        ensure!(
96            consensus_config.payload_buffer_bytes >= consensus_config.payload_batch_bytes,
97            "no need to evict cached externals if can send them in one message"
98        );
99
100        self.consensus_config = Some(consensus_config.clone());
101        Ok(())
102    }
103
104    pub fn set_genesis(&mut self, info: GenesisInfo) {
105        self.genesis_info = Some(info);
106    }
107
108    pub fn get_consensus_config(&self) -> Option<&ConsensusConfig> {
109        self.consensus_config.as_ref()
110    }
111
112    pub fn get_genesis(&self) -> Option<GenesisInfo> {
113        self.genesis_info
114    }
115
116    pub fn build(&self) -> Result<MempoolMergedConfig> {
117        let genesis_info = *self
118            .genesis_info
119            .as_ref()
120            .context("mempool genesis info for config is not known")?;
121
122        let consensus = self
123            .consensus_config
124            .as_ref()
125            .context("mempool consensus config is not known")?;
126
127        let genesis_round = AnchorStage::align_genesis(genesis_info.start_round);
128
129        let mempool_config = MempoolConfig {
130            consensus: consensus.clone(),
131            genesis_round,
132            point_max_bytes: Point::max_byte_size(consensus),
133        };
134
135        // reset types to u128 as it does not match fields in `ConsensusConfig`
136        // and may be changed just to keep them handy, that must not affect hash
137        let mut hasher = blake3::Hasher::new();
138        // unaligned `genesis_info.start_round` is not used
139        hasher.update(&(genesis_round.0 as u128).to_be_bytes());
140        hasher.update(&(genesis_info.genesis_millis as u128).to_be_bytes());
141        hasher.update(&(consensus.clock_skew_millis.get() as u128).to_be_bytes());
142        hasher.update(&(consensus.payload_batch_bytes.get() as u128).to_be_bytes());
143        hasher.update(&(consensus.commit_history_rounds.get() as u128).to_be_bytes());
144        hasher.update(&(consensus.deduplicate_rounds as u128).to_be_bytes());
145        hasher.update(&(consensus.max_consensus_lag_rounds.get() as u128).to_be_bytes());
146        hasher.update(&(consensus.download_peer_queries.get() as u128).to_be_bytes());
147        hasher.update(&(consensus.sync_support_rounds.get() as u128).to_be_bytes());
148
149        let overlay_id = OverlayId(hasher.finalize().into());
150
151        Ok(MempoolMergedConfig {
152            genesis_info,
153            conf: mempool_config,
154            overlay_id,
155        })
156    }
157}
158
159#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
160#[serde(default)]
161pub struct MempoolNodeConfig {
162    /// `true` to truncate hashes, signatures and use non-standard format for large structs
163    /// that may be more readable
164    pub log_truncate_long_values: bool,
165
166    /// `true` to export metrics on how the current node views every other's points,
167    /// this is `NxN` amount of metrics (N labels from each of N nodes)
168    pub emit_stats_metrics: bool,
169
170    /// How often (in rounds) delete obsolete data and trigger rocksDB compaction.
171    pub clean_db_period_rounds: NonZeroU16,
172
173    /// amount of future [Round]s (beyond current [`Dag`](crate::dag::DagHead))
174    /// that [`BroadcastFilter`](crate::intercom::BroadcastFilter) caches
175    /// to extend [`Dag`](crate::engine::ConsensusConfigExt) without downloading points
176    pub cache_future_broadcasts_rounds: u16,
177
178    /// [`Tokio default upper limit`](tokio::runtime::Builder::max_blocking_threads) is 512.
179    ///
180    /// [`Tycho tread pool config`](tycho_util::cli::config::ThreadPoolConfig)
181    /// does not have a corresponding parameter and does not change tokio's default value.
182    pub max_blocking_tasks: NonZeroU16,
183
184    /// Max simultaneous point search tasks fulfilling download request
185    pub max_upload_tasks: NonZeroU8,
186
187    /// Limits amount of unique points being simultaneously downloaded from all peers.
188    pub max_download_tasks: NonZeroU16,
189}
190
191impl Default for MempoolNodeConfig {
192    fn default() -> Self {
193        Self {
194            log_truncate_long_values: true,
195            emit_stats_metrics: true,
196            clean_db_period_rounds: 105.try_into().unwrap(),
197            cache_future_broadcasts_rounds: 105,
198            max_blocking_tasks: 250.try_into().unwrap(),
199            max_upload_tasks: 50.try_into().unwrap(),
200            max_download_tasks: 250.try_into().unwrap(),
201        }
202    }
203}