Skip to main content

tycho_collator/mempool/
mod.rs

1mod state_update_context;
2
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use bytes::Bytes;
8use tycho_network::PeerId;
9use tycho_types::models::*;
10use tycho_types::prelude::*;
11
12pub use self::impls::*;
13pub use self::state_update_context::*;
14use crate::types::processed_upto::BlockSeqno;
15
16mod impls {
17    pub use self::dump_anchors::{DumpAnchors, DumpedAnchor};
18    pub use self::single_node_impl::MempoolAdapterSingleNodeImpl;
19    pub use self::std_impl::MempoolAdapterStdImpl;
20    pub use self::stub_impl::MempoolAdapterStubImpl;
21    #[cfg(test)]
22    pub(crate) use self::stub_impl::{make_stub_anchor, make_stub_external};
23
24    mod common;
25    mod dump_anchors;
26    mod single_node_impl;
27    mod std_impl;
28    mod stub_impl;
29}
30
31// === Factory ===
32
33pub trait MempoolAdapterFactory {
34    fn create(self, listener: Arc<dyn MempoolEventListener>) -> Arc<dyn MempoolAdapter>;
35}
36
37impl<F, R> MempoolAdapterFactory for F
38where
39    F: FnOnce(Arc<dyn MempoolEventListener>) -> Arc<R>,
40    R: MempoolAdapter,
41{
42    fn create(self, listener: Arc<dyn MempoolEventListener>) -> Arc<dyn MempoolAdapter> {
43        self(listener)
44    }
45}
46
47// === Events Listener ===
48
49#[async_trait]
50pub trait MempoolEventListener: Send + Sync {
51    /// Process new anchor from mempool
52    async fn on_new_anchor(&self, anchor: Arc<MempoolAnchor>) -> Result<()>;
53}
54
55// === Adapter ===
56
57#[async_trait]
58pub trait MempoolAdapter: Send + Sync + 'static {
59    /// Process updates related to master block:
60    /// 1. Mempool switch round
61    /// 2. Mempool config
62    /// 3. Validators sets
63    async fn handle_mc_state_update(&self, cx: Box<StateUpdateContext>) -> Result<()>;
64
65    /// Process state update reported by collation manager earlier.
66    /// Will apply vset and config changes to mempool. Also starts mempool at first call.
67    /// Advances mempool pause bound which allows mempool to resume its work.
68    /// Mempool should be ready to return mc block `processed_up_to` anchor and all next after it.
69    /// This method will not clean anchor cache.
70    async fn handle_signed_mc_block(&self, mc_block_seqno: BlockSeqno) -> Result<()>;
71
72    /// Request, await, and return anchor from connected mempool by id.
73    /// Return None if the requested anchor does not exist and cannot be synced from other nodes.
74    async fn get_anchor_by_id(&self, anchor_id: MempoolAnchorId) -> Result<GetAnchorResult>;
75
76    /// Request, await, and return the next anchor after the specified previous one.
77    /// If anchor does not exist then await until it be produced or downloaded during sync.
78    /// Return None if anchor cannot be produced or synced from other nodes.
79    async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<GetAnchorResult>;
80
81    /// Clean cache from all anchors that before specified.
82    /// We can do this for anchors that processed in blocks
83    /// which included in signed master - we do not need them anymore
84    fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()>;
85
86    /// Enqueue external message to be consumed and processed by mempool
87    fn accept_external(&self, message: Bytes);
88
89    /// **Warning:** changes from `GlobalConfig` may be rewritten by applied mc state
90    /// only if applied mc state has greater time and GEQ round
91    async fn update_delayed_config(
92        &self,
93        consensus_config: Option<&ConsensusConfig>,
94        genesis_info: &GenesisInfo,
95    ) -> Result<()>;
96}
97
98impl MempoolAdapterFactory for Arc<dyn MempoolAdapter> {
99    fn create(self, _listener: Arc<dyn MempoolEventListener>) -> Arc<dyn MempoolAdapter> {
100        self
101    }
102}
103
104// === Types ===
105
106pub type MempoolAnchorId = u32;
107
108#[derive(Debug)]
109pub struct ExternalMessage {
110    pub cell: Cell,
111    pub info: ExtInMsgInfo,
112}
113
114impl ExternalMessage {
115    pub fn hash(&self) -> &HashBytes {
116        self.cell.repr_hash()
117    }
118}
119
120#[derive(Debug)]
121pub struct MempoolAnchor {
122    pub id: MempoolAnchorId,
123    /// `None` for right after any of: either Genesis or an unrecoverable gap
124    pub prev_id: Option<MempoolAnchorId>,
125    pub author: PeerId,
126    pub chain_time: u64,
127    pub externals: Vec<Arc<ExternalMessage>>,
128}
129
130impl MempoolAnchor {
131    pub fn count_externals_for(&self, shard_id: &ShardIdent, offset: usize) -> usize {
132        self.externals
133            .iter()
134            .skip(offset)
135            .filter(|ext| shard_id.contains_address(&ext.info.dst))
136            .count()
137    }
138
139    pub fn has_externals_for(&self, shard_id: &ShardIdent, offset: usize) -> bool {
140        self.externals
141            .iter()
142            .skip(offset)
143            .any(|ext| shard_id.contains_address(&ext.info.dst))
144    }
145
146    pub fn iter_externals(
147        &self,
148        from_idx: usize,
149    ) -> impl Iterator<Item = Arc<ExternalMessage>> + '_ {
150        self.externals.iter().skip(from_idx).cloned()
151    }
152}
153
154pub enum GetAnchorResult {
155    NotExist,
156    Exist(Arc<MempoolAnchor>),
157}
158
159impl GetAnchorResult {
160    pub fn anchor(&self) -> Option<&MempoolAnchor> {
161        match self {
162            Self::Exist(arc) => Some(arc),
163            Self::NotExist => None,
164        }
165    }
166}