1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
use {
    crossbeam_channel::{Receiver, RecvTimeoutError},
    itertools::izip,
    solana_ledger::{
        blockstore::Blockstore,
        blockstore_processor::{TransactionStatusBatch, TransactionStatusMessage},
    },
    solana_runtime::bank::{
        Bank, InnerInstructionsList, NonceRollbackInfo, TransactionLogMessages,
    },
    solana_transaction_status::{InnerInstructions, Reward, TransactionStatusMeta},
    std::{
        sync::{
            atomic::{AtomicBool, AtomicU64, Ordering},
            Arc,
        },
        thread::{self, Builder, JoinHandle},
        time::Duration,
    },
};

pub struct TransactionStatusService {
    thread_hdl: JoinHandle<()>,
}

impl TransactionStatusService {
    #[allow(clippy::new_ret_no_self)]
    pub fn new(
        write_transaction_status_receiver: Receiver<TransactionStatusMessage>,
        max_complete_transaction_status_slot: Arc<AtomicU64>,
        blockstore: Arc<Blockstore>,
        exit: &Arc<AtomicBool>,
    ) -> Self {
        let exit = exit.clone();
        let thread_hdl = Builder::new()
            .name("solana-transaction-status-writer".to_string())
            .spawn(move || loop {
                if exit.load(Ordering::Relaxed) {
                    break;
                }
                if let Err(RecvTimeoutError::Disconnected) = Self::write_transaction_status_batch(
                    &write_transaction_status_receiver,
                    &max_complete_transaction_status_slot,
                    &blockstore,
                ) {
                    break;
                }
            })
            .unwrap();
        Self { thread_hdl }
    }

    fn write_transaction_status_batch(
        write_transaction_status_receiver: &Receiver<TransactionStatusMessage>,
        max_complete_transaction_status_slot: &Arc<AtomicU64>,
        blockstore: &Arc<Blockstore>,
    ) -> Result<(), RecvTimeoutError> {
        match write_transaction_status_receiver.recv_timeout(Duration::from_secs(1))? {
            TransactionStatusMessage::Batch(TransactionStatusBatch {
                bank,
                transactions,
                statuses,
                balances,
                token_balances,
                inner_instructions,
                transaction_logs,
                rent_debits,
            }) => {
                let slot = bank.slot();
                let inner_instructions_iter: Box<
                    dyn Iterator<Item = Option<InnerInstructionsList>>,
                > = if let Some(inner_instructions) = inner_instructions {
                    Box::new(inner_instructions.into_iter())
                } else {
                    Box::new(std::iter::repeat_with(|| None))
                };
                let transaction_logs_iter: Box<
                    dyn Iterator<Item = Option<TransactionLogMessages>>,
                > = if let Some(transaction_logs) = transaction_logs {
                    Box::new(transaction_logs.into_iter())
                } else {
                    Box::new(std::iter::repeat_with(|| None))
                };
                for (
                    transaction,
                    (status, nonce_rollback),
                    pre_balances,
                    post_balances,
                    pre_token_balances,
                    post_token_balances,
                    inner_instructions,
                    log_messages,
                    rent_debits,
                ) in izip!(
                    &transactions,
                    statuses,
                    balances.pre_balances,
                    balances.post_balances,
                    token_balances.pre_token_balances,
                    token_balances.post_token_balances,
                    inner_instructions_iter,
                    transaction_logs_iter,
                    rent_debits.into_iter(),
                ) {
                    if Bank::can_commit(&status) && !transaction.signatures.is_empty() {
                        let fee_calculator = nonce_rollback
                            .map(|nonce_rollback| nonce_rollback.fee_calculator())
                            .unwrap_or_else(|| {
                                bank.get_fee_calculator(&transaction.message().recent_blockhash)
                            })
                            .expect("FeeCalculator must exist");
                        let fee = fee_calculator.calculate_fee(transaction.message());
                        let (writable_keys, readonly_keys) =
                            transaction.message.get_account_keys_by_lock_type();

                        let inner_instructions = inner_instructions.map(|inner_instructions| {
                            inner_instructions
                                .into_iter()
                                .enumerate()
                                .map(|(index, instructions)| InnerInstructions {
                                    index: index as u8,
                                    instructions,
                                })
                                .filter(|i| !i.instructions.is_empty())
                                .collect()
                        });

                        let pre_token_balances = Some(pre_token_balances);
                        let post_token_balances = Some(post_token_balances);
                        let rewards = Some(
                            rent_debits
                                .0
                                .into_iter()
                                .map(|(pubkey, reward_info)| Reward {
                                    pubkey: pubkey.to_string(),
                                    lamports: reward_info.lamports,
                                    post_balance: reward_info.post_balance,
                                    reward_type: Some(reward_info.reward_type),
                                    commission: reward_info.commission,
                                })
                                .collect(),
                        );

                        blockstore
                            .write_transaction_status(
                                slot,
                                transaction.signatures[0],
                                writable_keys,
                                readonly_keys,
                                TransactionStatusMeta {
                                    status,
                                    fee,
                                    pre_balances,
                                    post_balances,
                                    inner_instructions,
                                    log_messages,
                                    pre_token_balances,
                                    post_token_balances,
                                    rewards,
                                },
                            )
                            .expect("Expect database write to succeed");
                    }
                }
            }
            TransactionStatusMessage::Freeze(slot) => {
                max_complete_transaction_status_slot.fetch_max(slot, Ordering::SeqCst);
            }
        }
        Ok(())
    }

    pub fn join(self) -> thread::Result<()> {
        self.thread_hdl.join()
    }
}