1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
use crate::{StateCommitter, TransactionCommitter, TransactionExecutor};
use aptos_logger::info;
use aptos_types::transaction::{Transaction, Version};
use aptos_vm::AptosVM;
use executor::block_executor::BlockExecutor;
use executor_types::BlockExecutorTrait;
use std::{
sync::{mpsc, Arc},
thread::JoinHandle,
};
use storage_interface::DbReaderWriter;
pub struct Pipeline {
join_handles: Vec<JoinHandle<()>>,
}
impl Pipeline {
pub fn new(
db: DbReaderWriter,
executor: BlockExecutor<AptosVM>,
version: Version,
) -> (Self, mpsc::SyncSender<Vec<Transaction>>) {
let parent_block_id = executor.committed_block_id();
let base_smt = executor.root_smt();
let executor_1 = Arc::new(executor);
let executor_2 = executor_1.clone();
let (block_sender, block_receiver) =
mpsc::sync_channel::<Vec<Transaction>>(50 );
let (commit_sender, commit_receiver) = mpsc::sync_channel(3 );
let (state_commit_sender, state_commit_receiver) = mpsc::sync_channel(5 );
let exe_thread = std::thread::Builder::new()
.name("txn_executor".to_string())
.spawn(move || {
let mut exe = TransactionExecutor::new(
executor_1,
parent_block_id,
version,
Some(commit_sender),
);
while let Ok(transactions) = block_receiver.recv() {
info!("Received block of size {:?} to execute", transactions.len());
exe.execute_block(transactions);
}
})
.expect("Failed to spawn transaction executor thread.");
let commit_thread = std::thread::Builder::new()
.name("txn_committer".to_string())
.spawn(move || {
let mut committer = TransactionCommitter::new(
executor_2,
version,
commit_receiver,
state_commit_sender,
);
committer.run();
})
.expect("Failed to spawn transaction committer thread.");
let db_writer = db.writer.clone();
let state_commit_thread = std::thread::Builder::new()
.name("state_committer".to_string())
.spawn(move || {
let committer =
StateCommitter::new(state_commit_receiver, db_writer, base_smt, Some(version));
committer.run();
})
.expect("Failed to spawn state committer thread.");
let join_handles = vec![exe_thread, commit_thread, state_commit_thread];
(Self { join_handles }, block_sender)
}
pub fn join(self) {
for handle in self.join_handles {
handle.join().unwrap()
}
}
}