tycho_collator/mempool/
mod.rs1mod state_update_context;
2
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use bytes::Bytes;
8use tycho_network::PeerId;
9use tycho_types::models::*;
10use tycho_types::prelude::*;
11
12pub use self::impls::*;
13pub use self::state_update_context::*;
14use crate::types::processed_upto::BlockSeqno;
15
16mod impls {
17 pub use self::dump_anchors::{DumpAnchors, DumpedAnchor};
18 pub use self::single_node_impl::MempoolAdapterSingleNodeImpl;
19 pub use self::std_impl::MempoolAdapterStdImpl;
20 pub use self::stub_impl::MempoolAdapterStubImpl;
21 #[cfg(test)]
22 pub(crate) use self::stub_impl::{make_stub_anchor, make_stub_external};
23
24 mod common;
25 mod dump_anchors;
26 mod single_node_impl;
27 mod std_impl;
28 mod stub_impl;
29}
30
31pub trait MempoolAdapterFactory {
34 fn create(self, listener: Arc<dyn MempoolEventListener>) -> Arc<dyn MempoolAdapter>;
35}
36
37impl<F, R> MempoolAdapterFactory for F
38where
39 F: FnOnce(Arc<dyn MempoolEventListener>) -> Arc<R>,
40 R: MempoolAdapter,
41{
42 fn create(self, listener: Arc<dyn MempoolEventListener>) -> Arc<dyn MempoolAdapter> {
43 self(listener)
44 }
45}
46
47#[async_trait]
50pub trait MempoolEventListener: Send + Sync {
51 async fn on_new_anchor(&self, anchor: Arc<MempoolAnchor>) -> Result<()>;
53}
54
55#[async_trait]
58pub trait MempoolAdapter: Send + Sync + 'static {
59 async fn handle_mc_state_update(&self, cx: Box<StateUpdateContext>) -> Result<()>;
64
65 async fn handle_signed_mc_block(&self, mc_block_seqno: BlockSeqno) -> Result<()>;
71
72 async fn get_anchor_by_id(&self, anchor_id: MempoolAnchorId) -> Result<GetAnchorResult>;
75
76 async fn get_next_anchor(&self, prev_anchor_id: MempoolAnchorId) -> Result<GetAnchorResult>;
80
81 fn clear_anchors_cache(&self, before_anchor_id: MempoolAnchorId) -> Result<()>;
85
86 fn accept_external(&self, message: Bytes);
88
89 async fn update_delayed_config(
92 &self,
93 consensus_config: Option<&ConsensusConfig>,
94 genesis_info: &GenesisInfo,
95 ) -> Result<()>;
96}
97
98impl MempoolAdapterFactory for Arc<dyn MempoolAdapter> {
99 fn create(self, _listener: Arc<dyn MempoolEventListener>) -> Arc<dyn MempoolAdapter> {
100 self
101 }
102}
103
104pub type MempoolAnchorId = u32;
107
108#[derive(Debug)]
109pub struct ExternalMessage {
110 pub cell: Cell,
111 pub info: ExtInMsgInfo,
112}
113
114impl ExternalMessage {
115 pub fn hash(&self) -> &HashBytes {
116 self.cell.repr_hash()
117 }
118}
119
120#[derive(Debug)]
121pub struct MempoolAnchor {
122 pub id: MempoolAnchorId,
123 pub prev_id: Option<MempoolAnchorId>,
125 pub author: PeerId,
126 pub chain_time: u64,
127 pub externals: Vec<Arc<ExternalMessage>>,
128}
129
130impl MempoolAnchor {
131 pub fn count_externals_for(&self, shard_id: &ShardIdent, offset: usize) -> usize {
132 self.externals
133 .iter()
134 .skip(offset)
135 .filter(|ext| shard_id.contains_address(&ext.info.dst))
136 .count()
137 }
138
139 pub fn has_externals_for(&self, shard_id: &ShardIdent, offset: usize) -> bool {
140 self.externals
141 .iter()
142 .skip(offset)
143 .any(|ext| shard_id.contains_address(&ext.info.dst))
144 }
145
146 pub fn iter_externals(
147 &self,
148 from_idx: usize,
149 ) -> impl Iterator<Item = Arc<ExternalMessage>> + '_ {
150 self.externals.iter().skip(from_idx).cloned()
151 }
152}
153
154pub enum GetAnchorResult {
155 NotExist,
156 Exist(Arc<MempoolAnchor>),
157}
158
159impl GetAnchorResult {
160 pub fn anchor(&self) -> Option<&MempoolAnchor> {
161 match self {
162 Self::Exist(arc) => Some(arc),
163 Self::NotExist => None,
164 }
165 }
166}