ex3-block-builder 0.15.29

EX3 block report data structure.
Documentation
use crate::{BlockBuilder, SnapshotVaultIndex};
use ex3_balance_vault_public_types::{
    AcceptedWithdrawalShardingSubReport, CandidBalanceShardingSubReport,
    RejectedWithdrawalShardingSubReport,
};
use ex3_blockchain_public_types::{ActivatedBalanceVault, SnapshotCursor};
use ex3_canister_types::range::ReportRange;
use ex3_crypto::sha256;
use ex3_node_converter::convert_to_candid_balance_changed_records;
use ex3_node_types::balance_changed::BalanceChanged;
use ex3_node_types::range::CandidRange;
use ex3_node_types::transaction::EncodedTransaction;
use ex3_node_types::{AssetId, BalanceVaultSeqId, BlockHeight};
use ex3_node_types::{MerkleNode, WalletRegisterId};
use ex3_serde::bincode;
use ex3_serde::bincode::serialize;
use num_traits::{CheckedSub, One, ToPrimitive, Zero};
use rs_merkle::algorithms::Sha256;
use rs_merkle::MerkleTree;
use std::cmp::max;
use std::collections::{BTreeMap, VecDeque};

impl BlockBuilder {
    /// Generate balance sharding sub reports
    /// Note: include balance changed report & withdrawal tx report
    pub(crate) fn generate_balance_sharding_sub_reports(
        &self,
        balances_changed_map: BTreeMap<
            BalanceVaultSeqId,
            Vec<((WalletRegisterId, Vec<(AssetId, BalanceChanged)>), usize)>,
        >,
        balances_changed_merkle_tree: &MerkleTree<Sha256>,
        txs: BTreeMap<BalanceVaultSeqId, Vec<(EncodedTransaction, usize)>>,
        txs_merkle_tree: &MerkleTree<Sha256>,
        rejected_txs: BTreeMap<BalanceVaultSeqId, Vec<(EncodedTransaction, usize)>>,
        rejected_txs_merkle_tree: &MerkleTree<Sha256>,
        snapshot_vault_cursor_of_pre_height: &SnapshotCursor,
    ) -> BTreeMap<
        BalanceVaultSeqId,
        (
            Vec<CandidBalanceShardingSubReport>,
            Vec<AcceptedWithdrawalShardingSubReport>,
            Vec<RejectedWithdrawalShardingSubReport>,
        ),
    > {
        let mut balance_vault_sharding_sub_reports = BTreeMap::new();

        // balance vault seq id start from 0
        let max_balance_vault_seq_id = max(
            balances_changed_map.keys().max().unwrap_or(&0).clone(),
            snapshot_vault_cursor_of_pre_height.max_activated_balance_vault_seq_id,
        );
        let max_balance_vault_seq_id = max(
            max_balance_vault_seq_id,
            txs.keys().max().unwrap_or(&0).clone(),
        );
        let mut next_balance_vault_seq_id: BalanceVaultSeqId = 0;
        let mut balances_changed_map_mut = balances_changed_map;
        let total_balance_changed_count = balances_changed_merkle_tree.leaves_len();
        let mut txs_mut = txs;
        let mut rejected_txs_mut = rejected_txs;
        let total_tx_count = txs_merkle_tree.leaves_len();
        let total_rejected_tx_count = rejected_txs_merkle_tree.leaves_len();
        let mut next_sharding_start = 0usize;

        while next_balance_vault_seq_id <= max_balance_vault_seq_id {
            let mut balance_sharding_sub_reports = vec![];
            let mut accepted_withdrawal_sharding_sub_reports = vec![];
            let mut rejected_withdrawal_sharding_sub_reports = vec![];

            if balances_changed_map_mut.contains_key(&next_balance_vault_seq_id) {
                let mut balances_changed = balances_changed_map_mut
                    .remove(&next_balance_vault_seq_id)
                    .unwrap()
                    .into_iter()
                    .collect::<VecDeque<_>>();

                let total_balance_changed_count_in_sharding = balances_changed.len();
                let sharding_end = next_sharding_start + total_balance_changed_count_in_sharding;
                let mut next_report_range_start = next_sharding_start.clone();

                loop {
                    let mut balances = vec![];

                    let mut balances_changed_count = 0;

                    while balances_changed_count
                        < self.max_balance_changed_records_per_sharding_sub_report
                    {
                        if let Some(((wallet_id, asset_changed), index)) =
                            balances_changed.pop_front()
                        {
                            balances_changed_count += asset_changed.len();
                            balances.push(((wallet_id, asset_changed), index));
                        } else {
                            break;
                        }
                    }

                    let balances_changed_count = balances.len();

                    if balances_changed_count == 0 {
                        break;
                    }
                    let (balances_changed, indexes) = balances
                        .into_iter()
                        .map(|(records, index)| (records, index))
                        .unzip::<_, usize, Vec<_>, Vec<usize>>();

                    let range_end = next_report_range_start.clone() + balances_changed_count;

                    let range = ReportRange {
                        total_count: total_balance_changed_count.try_into().unwrap(),
                        sharding_range: CandidRange {
                            start: next_sharding_start.clone().try_into().unwrap(),
                            end: sharding_end.clone().try_into().unwrap(),
                        },
                        report_range: CandidRange {
                            start: next_report_range_start.try_into().unwrap(),
                            end: range_end.try_into().unwrap(),
                        },
                    };

                    let merkle_proof = balances_changed_merkle_tree.proof(&indexes);

                    let balance_sharding_sub_report = CandidBalanceShardingSubReport {
                        merkle_proof: merkle_proof.to_bytes(),
                        range,
                        balances_changed: convert_to_candid_balance_changed_records(
                            balances_changed,
                        ),
                    };

                    balance_sharding_sub_reports.push(balance_sharding_sub_report);
                    next_report_range_start += balances_changed_count;
                }
                next_sharding_start += total_balance_changed_count_in_sharding;
            }

            if txs_mut.contains_key(&next_balance_vault_seq_id) {
                let mut txs = txs_mut
                    .remove(&next_balance_vault_seq_id)
                    .unwrap()
                    .into_iter()
                    .collect::<VecDeque<_>>();

                let total_tx_count_in_sharding = txs.len();

                loop {
                    let mut withdrawals = vec![];

                    for _ in 0..self.max_withdrawal_txs_per_sharding_sub_report {
                        if let Some((tx, index)) = txs.pop_front() {
                            withdrawals.push((tx, index));
                        } else {
                            break;
                        }
                    }

                    if withdrawals.is_empty() {
                        break;
                    }
                    let (withdrawals, indexes) = withdrawals
                        .into_iter()
                        .map(|(tx, index)| (tx, index.clone()))
                        .unzip::<EncodedTransaction, usize, Vec<EncodedTransaction>, Vec<usize>>();

                    let report_start = accepted_withdrawal_sharding_sub_reports.len()
                        * self.max_withdrawal_txs_per_sharding_sub_report.clone();
                    let report_end = report_start + indexes.len();

                    let range = ReportRange {
                        total_count: total_tx_count.try_into().unwrap(),
                        sharding_range: CandidRange {
                            start: 0u32,
                            end: total_tx_count_in_sharding.try_into().unwrap(),
                        },
                        report_range: CandidRange {
                            start: report_start.try_into().unwrap(),
                            end: report_end.try_into().unwrap(),
                        },
                    };

                    let merkle_proof = txs_merkle_tree.proof(&indexes);

                    let accepted_withdrawal_sharding_sub_report =
                        AcceptedWithdrawalShardingSubReport {
                            merkle_proof: merkle_proof.to_bytes(),
                            range,
                            withdrawals,
                            withdrawal_indexes: indexes
                                .into_iter()
                                .map(|i| i.try_into().unwrap())
                                .collect(),
                        };

                    accepted_withdrawal_sharding_sub_reports
                        .push(accepted_withdrawal_sharding_sub_report);
                }
            }

            if rejected_txs_mut.contains_key(&next_balance_vault_seq_id) {
                let mut rejected_txs = rejected_txs_mut
                    .remove(&next_balance_vault_seq_id)
                    .unwrap()
                    .into_iter()
                    .collect::<VecDeque<_>>();

                let total_rejected_tx_count_in_sharding = rejected_txs.len();

                loop {
                    let mut rejected_withdrawals = vec![];

                    for _ in 0..self.max_withdrawal_txs_per_sharding_sub_report {
                        if let Some((tx, index)) = rejected_txs.pop_front() {
                            rejected_withdrawals.push((tx, index));
                        } else {
                            break;
                        }
                    }

                    if rejected_withdrawals.is_empty() {
                        break;
                    }
                    let (withdrawals, indexes) = rejected_withdrawals
                        .into_iter()
                        .map(|(tx, index)| (tx, index.clone()))
                        .unzip::<EncodedTransaction, usize, Vec<EncodedTransaction>, Vec<usize>>();

                    let report_start = rejected_withdrawal_sharding_sub_reports.len()
                        * self.max_withdrawal_txs_per_sharding_sub_report.clone();
                    let report_end = report_start + indexes.len();

                    let range = ReportRange {
                        total_count: total_rejected_tx_count.try_into().unwrap(),
                        sharding_range: CandidRange {
                            start: 0u32,
                            end: total_rejected_tx_count_in_sharding.try_into().unwrap(),
                        },
                        report_range: CandidRange {
                            start: report_start.try_into().unwrap(),
                            end: report_end.try_into().unwrap(),
                        },
                    };

                    let merkle_proof = rejected_txs_merkle_tree.proof(&indexes);

                    let rejected_withdrawal_sharding_sub_report =
                        RejectedWithdrawalShardingSubReport {
                            merkle_proof: merkle_proof.to_bytes(),
                            range,
                            withdrawals,
                            withdrawal_indexes: indexes
                                .into_iter()
                                .map(|i| i.try_into().unwrap())
                                .collect(),
                        };

                    rejected_withdrawal_sharding_sub_reports
                        .push(rejected_withdrawal_sharding_sub_report);
                }
            }

            balance_vault_sharding_sub_reports.insert(
                next_balance_vault_seq_id,
                (
                    balance_sharding_sub_reports,
                    accepted_withdrawal_sharding_sub_reports,
                    rejected_withdrawal_sharding_sub_reports,
                ),
            );

            next_balance_vault_seq_id += 1;
        }

        balance_vault_sharding_sub_reports
    }

    /// Generate balance vault data integrity
    pub(crate) fn generate_balance_vault_data_integrity(
        &self,
        current_block_height: &BlockHeight,
        balance_sharding_sub_reports: &BTreeMap<
            BalanceVaultSeqId,
            (
                Vec<CandidBalanceShardingSubReport>,
                Vec<AcceptedWithdrawalShardingSubReport>,
                Vec<RejectedWithdrawalShardingSubReport>,
            ),
        >,
        snapshot_cursor: &SnapshotCursor,
        snapshot_vault_index: &SnapshotVaultIndex,
    ) -> (
        BTreeMap<BalanceVaultSeqId, MerkleNode>,
        Vec<ActivatedBalanceVault>,
    ) {
        let mut balance_vault_data_integrity = BTreeMap::new();
        let mut new_activated_balance_vaults = vec![];
        balance_sharding_sub_reports.iter().for_each(
            |(
                balance_vault_seq_id,
                (balance_reports, accepted_withdrawal_reports, rejected_withdrawal_reports),
            )| {
                // pre block height process in this balance vault
                let mut pre_block_height_processed: Option<BlockHeight> = None;

                if balance_vault_seq_id <= &snapshot_cursor.max_activated_balance_vault_seq_id
                    && *current_block_height > BlockHeight::zero()
                {
                    pre_block_height_processed = Some(
                        current_block_height
                            .checked_sub(&BlockHeight::one())
                            .unwrap(),
                    );
                }

                let total_vaults_count = snapshot_vault_index.get_total_vaults_count();
                let total_merkle_node_count = snapshot_vault_index.get_total_merkle_node_count();
                let balance_vault_index =
                    snapshot_vault_index.get_balance_vault_index(balance_vault_seq_id);

                let mut data_buffer = vec![];
                // 2.1. pre_block_height_bytes: the block height of the last processed block
                data_buffer.extend(
                    serialize(&pre_block_height_processed).expect("serialize should not fail"),
                );
                // 2.2. current_block_height_bytes: the block height of the current processed block
                data_buffer.extend(current_block_height.0.to_bytes_le());
                // 2.3. canister_id_bytes: current balance vault sequence id
                data_buffer.extend(balance_vault_seq_id.to_le_bytes());
                // 2.4. total_merkle_node_count_bytes
                data_buffer.extend(total_merkle_node_count.to_le_bytes());
                // 2.5. canister_index_bytes: the index of the current balance vault canister
                //   in the total canisters
                data_buffer.extend(balance_vault_index.to_le_bytes());

                // 2.6. wallet_count_balance_changed: the wallet count in this balance vault
                let wallet_count_balance_changed: u32 = balance_reports
                    .iter()
                    .map(|report| report.balances_changed.0.len())
                    .sum::<usize>()
                    .try_into()
                    .unwrap();
                data_buffer.extend(wallet_count_balance_changed.to_le_bytes());

                // 2.6. accepted_withdrawal_count: the withdrawal count in this balance vault
                let count_accepted_withdrawal: u32 = accepted_withdrawal_reports
                    .iter()
                    .map(|report| report.withdrawals.len())
                    .sum::<usize>()
                    .try_into()
                    .unwrap();
                data_buffer.extend(count_accepted_withdrawal.to_le_bytes());

                // 2.7. rejected_withdrawal_count: the withdrawal count in this balance vault
                let count_rejected_withdrawal: u32 = rejected_withdrawal_reports
                    .iter()
                    .map(|report| report.withdrawals.len())
                    .sum::<usize>()
                    .try_into()
                    .unwrap();
                data_buffer.extend(count_rejected_withdrawal.to_le_bytes());

                // 2.8. data_bytes: the bytes of current sharding report
                //     2.8.1. balance changed data bytes
                balance_reports
                    .iter()
                    .for_each(|sub_report| data_buffer.extend(&sub_report.merkle_proof));

                //     2.8.2. accepted withdrawal data bytes
                accepted_withdrawal_reports
                    .iter()
                    .for_each(|report| data_buffer.extend(&report.merkle_proof));

                //     2.8.3. rejected withdrawal data bytes
                rejected_withdrawal_reports.iter().for_each(|report| {
                    data_buffer.extend(&report.merkle_proof);
                });

                let data_integrity: MerkleNode = sha256(data_buffer);

                balance_vault_data_integrity.insert(balance_vault_seq_id.clone(), data_integrity);

                if pre_block_height_processed.is_none() {
                    new_activated_balance_vaults.push(ActivatedBalanceVault {
                        canister_seq_id: balance_vault_seq_id.clone(),
                        canister_total_count: total_vaults_count,
                        canister_index: balance_vault_index,
                        hash_of_sharding: data_integrity,
                    });
                }
            },
        );

        (balance_vault_data_integrity, new_activated_balance_vaults)
    }
    /// Get balance vault seq id by wallet id
    pub fn get_balance_vault_seq_id_by_wallet_id(
        &self,
        wallet_id: &WalletRegisterId,
    ) -> BalanceVaultSeqId {
        // balance vault seq id is start from 0
        // every balance vault has [MAX_WALLETS_PER_BALANCE_VAULT_CANISTER] wallets
        // so we can get balance vault seq id by wallet id
        let wallet_id = wallet_id
            .checked_sub(&WalletRegisterId::from(self.start_wallet_id.clone()))
            .unwrap();
        (wallet_id.0 / self.max_wallets_per_balance_vault_canister.clone())
            .to_u64()
            .unwrap()
    }

    pub(crate) fn build_balances_changed_merkle_tree(
        &self,
        balances_changed: &Vec<(WalletRegisterId, Vec<(AssetId, BalanceChanged)>)>,
    ) -> MerkleTree<Sha256> {
        let nodes: Vec<MerkleNode> = balances_changed
            .iter()
            .map(|item| sha256(&bincode::serialize(item).unwrap()))
            .collect();
        MerkleTree::<Sha256>::from_leaves(&nodes)
    }
}