1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use crate::{StateCommitter, TransactionCommitter, TransactionExecutor};
use aptos_logger::info;
use aptos_types::transaction::{Transaction, Version};
use aptos_vm::AptosVM;
use executor::block_executor::BlockExecutor;
use executor_types::BlockExecutorTrait;
use std::{
    sync::{mpsc, Arc},
    thread::JoinHandle,
};
use storage_interface::DbReaderWriter;

pub struct Pipeline {
    join_handles: Vec<JoinHandle<()>>,
}

impl Pipeline {
    pub fn new(
        db: DbReaderWriter,
        executor: BlockExecutor<AptosVM>,
        version: Version,
    ) -> (Self, mpsc::SyncSender<Vec<Transaction>>) {
        let parent_block_id = executor.committed_block_id();
        let base_smt = executor.root_smt();
        let executor_1 = Arc::new(executor);
        let executor_2 = executor_1.clone();

        let (block_sender, block_receiver) =
            mpsc::sync_channel::<Vec<Transaction>>(50 /* bound */);
        let (commit_sender, commit_receiver) = mpsc::sync_channel(3 /* bound */);
        let (state_commit_sender, state_commit_receiver) = mpsc::sync_channel(5 /* bound */);

        let exe_thread = std::thread::Builder::new()
            .name("txn_executor".to_string())
            .spawn(move || {
                let mut exe = TransactionExecutor::new(
                    executor_1,
                    parent_block_id,
                    version,
                    Some(commit_sender),
                );
                while let Ok(transactions) = block_receiver.recv() {
                    info!("Received block of size {:?} to execute", transactions.len());
                    exe.execute_block(transactions);
                }
            })
            .expect("Failed to spawn transaction executor thread.");
        let commit_thread = std::thread::Builder::new()
            .name("txn_committer".to_string())
            .spawn(move || {
                let mut committer = TransactionCommitter::new(
                    executor_2,
                    version,
                    commit_receiver,
                    state_commit_sender,
                );
                committer.run();
            })
            .expect("Failed to spawn transaction committer thread.");
        let db_writer = db.writer.clone();
        let state_commit_thread = std::thread::Builder::new()
            .name("state_committer".to_string())
            .spawn(move || {
                let committer =
                    StateCommitter::new(state_commit_receiver, db_writer, base_smt, Some(version));
                committer.run();
            })
            .expect("Failed to spawn state committer thread.");

        let join_handles = vec![exe_thread, commit_thread, state_commit_thread];

        (Self { join_handles }, block_sender)
    }

    pub fn join(self) {
        for handle in self.join_handles {
            handle.join().unwrap()
        }
    }
}