1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use crate::{
core_mempool::CoreMempool,
network::{MempoolNetworkEvents, MempoolNetworkSender},
shared_mempool::{
coordinator::{coordinator, gc_coordinator, snapshot_job},
types::{MempoolEventsReceiver, SharedMempool, SharedMempoolNotification},
},
QuorumStoreRequest,
};
use aptos_config::{config::NodeConfig, network_id::NetworkId};
use aptos_infallible::{Mutex, RwLock};
use event_notifications::ReconfigNotificationListener;
use futures::channel::mpsc::{self, Receiver, UnboundedSender};
use mempool_notifications::MempoolNotificationListener;
use network::application::storage::PeerMetadataStorage;
use std::{collections::HashMap, sync::Arc};
use storage_interface::DbReader;
use tokio::runtime::{Builder, Handle, Runtime};
use vm_validator::vm_validator::{TransactionValidation, VMValidator};
pub(crate) fn start_shared_mempool<V>(
executor: &Handle,
config: &NodeConfig,
mempool: Arc<Mutex<CoreMempool>>,
mempool_network_handles: Vec<(NetworkId, MempoolNetworkSender, MempoolNetworkEvents)>,
client_events: MempoolEventsReceiver,
quorum_store_requests: mpsc::Receiver<QuorumStoreRequest>,
mempool_listener: MempoolNotificationListener,
mempool_reconfig_events: ReconfigNotificationListener,
db: Arc<dyn DbReader>,
validator: Arc<RwLock<V>>,
subscribers: Vec<UnboundedSender<SharedMempoolNotification>>,
peer_metadata_storage: Arc<PeerMetadataStorage>,
) where
V: TransactionValidation + 'static,
{
let mut all_network_events = vec![];
let mut network_senders = HashMap::new();
for (network_id, network_sender, network_events) in mempool_network_handles.into_iter() {
all_network_events.push((network_id, network_events));
network_senders.insert(network_id, network_sender);
}
let smp = SharedMempool::new(
mempool.clone(),
config.mempool.clone(),
network_senders,
db,
validator,
subscribers,
config.base.role,
peer_metadata_storage,
);
executor.spawn(coordinator(
smp,
executor.clone(),
all_network_events,
client_events,
quorum_store_requests,
mempool_listener,
mempool_reconfig_events,
));
executor.spawn(gc_coordinator(
mempool.clone(),
config.mempool.system_transaction_gc_interval_ms,
));
executor.spawn(snapshot_job(
mempool,
config.mempool.mempool_snapshot_interval_secs,
));
}
pub fn bootstrap(
config: &NodeConfig,
db: Arc<dyn DbReader>,
mempool_network_handles: Vec<(NetworkId, MempoolNetworkSender, MempoolNetworkEvents)>,
client_events: MempoolEventsReceiver,
quorum_store_requests: Receiver<QuorumStoreRequest>,
mempool_listener: MempoolNotificationListener,
mempool_reconfig_events: ReconfigNotificationListener,
peer_metadata_storage: Arc<PeerMetadataStorage>,
) -> Runtime {
let runtime = Builder::new_multi_thread()
.thread_name("shared-mem")
.enable_all()
.build()
.expect("[shared mempool] failed to create runtime");
let mempool = Arc::new(Mutex::new(CoreMempool::new(config)));
let vm_validator = Arc::new(RwLock::new(VMValidator::new(Arc::clone(&db))));
start_shared_mempool(
runtime.handle(),
config,
mempool,
mempool_network_handles,
client_events,
quorum_store_requests,
mempool_listener,
mempool_reconfig_events,
db,
vm_validator,
vec![],
peer_metadata_storage,
);
runtime
}