use crate::{BlockBuilder, SnapshotVaultIndex};
use ex3_balance_vault_public_types::{
AcceptedWithdrawalShardingSubReport, CandidBalanceShardingSubReport,
RejectedWithdrawalShardingSubReport,
};
use ex3_blockchain_public_types::{ActivatedBalanceVault, SnapshotCursor};
use ex3_canister_types::range::ReportRange;
use ex3_crypto::sha256;
use ex3_node_converter::convert_to_candid_balance_changed_records;
use ex3_node_types::balance_changed::BalanceChanged;
use ex3_node_types::range::CandidRange;
use ex3_node_types::transaction::EncodedTransaction;
use ex3_node_types::{AssetId, BalanceVaultSeqId, BlockHeight};
use ex3_node_types::{MerkleNode, WalletRegisterId};
use ex3_serde::bincode;
use ex3_serde::bincode::serialize;
use num_traits::{CheckedSub, One, ToPrimitive, Zero};
use rs_merkle::algorithms::Sha256;
use rs_merkle::MerkleTree;
use std::cmp::max;
use std::collections::{BTreeMap, VecDeque};
impl BlockBuilder {
pub(crate) fn generate_balance_sharding_sub_reports(
&self,
balances_changed_map: BTreeMap<
BalanceVaultSeqId,
Vec<((WalletRegisterId, Vec<(AssetId, BalanceChanged)>), usize)>,
>,
balances_changed_merkle_tree: &MerkleTree<Sha256>,
txs: BTreeMap<BalanceVaultSeqId, Vec<(EncodedTransaction, usize)>>,
txs_merkle_tree: &MerkleTree<Sha256>,
rejected_txs: BTreeMap<BalanceVaultSeqId, Vec<(EncodedTransaction, usize)>>,
rejected_txs_merkle_tree: &MerkleTree<Sha256>,
snapshot_vault_cursor_of_pre_height: &SnapshotCursor,
) -> BTreeMap<
BalanceVaultSeqId,
(
Vec<CandidBalanceShardingSubReport>,
Vec<AcceptedWithdrawalShardingSubReport>,
Vec<RejectedWithdrawalShardingSubReport>,
),
> {
let mut balance_vault_sharding_sub_reports = BTreeMap::new();
let max_balance_vault_seq_id = max(
balances_changed_map.keys().max().unwrap_or(&0).clone(),
snapshot_vault_cursor_of_pre_height.max_activated_balance_vault_seq_id,
);
let max_balance_vault_seq_id = max(
max_balance_vault_seq_id,
txs.keys().max().unwrap_or(&0).clone(),
);
let mut next_balance_vault_seq_id: BalanceVaultSeqId = 0;
let mut balances_changed_map_mut = balances_changed_map;
let total_balance_changed_count = balances_changed_merkle_tree.leaves_len();
let mut txs_mut = txs;
let mut rejected_txs_mut = rejected_txs;
let total_tx_count = txs_merkle_tree.leaves_len();
let total_rejected_tx_count = rejected_txs_merkle_tree.leaves_len();
let mut next_sharding_start = 0usize;
while next_balance_vault_seq_id <= max_balance_vault_seq_id {
let mut balance_sharding_sub_reports = vec![];
let mut accepted_withdrawal_sharding_sub_reports = vec![];
let mut rejected_withdrawal_sharding_sub_reports = vec![];
if balances_changed_map_mut.contains_key(&next_balance_vault_seq_id) {
let mut balances_changed = balances_changed_map_mut
.remove(&next_balance_vault_seq_id)
.unwrap()
.into_iter()
.collect::<VecDeque<_>>();
let total_balance_changed_count_in_sharding = balances_changed.len();
let sharding_end = next_sharding_start + total_balance_changed_count_in_sharding;
let mut next_report_range_start = next_sharding_start.clone();
loop {
let mut balances = vec![];
let mut balances_changed_count = 0;
while balances_changed_count
< self.max_balance_changed_records_per_sharding_sub_report
{
if let Some(((wallet_id, asset_changed), index)) =
balances_changed.pop_front()
{
balances_changed_count += asset_changed.len();
balances.push(((wallet_id, asset_changed), index));
} else {
break;
}
}
let balances_changed_count = balances.len();
if balances_changed_count == 0 {
break;
}
let (balances_changed, indexes) = balances
.into_iter()
.map(|(records, index)| (records, index))
.unzip::<_, usize, Vec<_>, Vec<usize>>();
let range_end = next_report_range_start.clone() + balances_changed_count;
let range = ReportRange {
total_count: total_balance_changed_count.try_into().unwrap(),
sharding_range: CandidRange {
start: next_sharding_start.clone().try_into().unwrap(),
end: sharding_end.clone().try_into().unwrap(),
},
report_range: CandidRange {
start: next_report_range_start.try_into().unwrap(),
end: range_end.try_into().unwrap(),
},
};
let merkle_proof = balances_changed_merkle_tree.proof(&indexes);
let balance_sharding_sub_report = CandidBalanceShardingSubReport {
merkle_proof: merkle_proof.to_bytes(),
range,
balances_changed: convert_to_candid_balance_changed_records(
balances_changed,
),
};
balance_sharding_sub_reports.push(balance_sharding_sub_report);
next_report_range_start += balances_changed_count;
}
next_sharding_start += total_balance_changed_count_in_sharding;
}
if txs_mut.contains_key(&next_balance_vault_seq_id) {
let mut txs = txs_mut
.remove(&next_balance_vault_seq_id)
.unwrap()
.into_iter()
.collect::<VecDeque<_>>();
let total_tx_count_in_sharding = txs.len();
loop {
let mut withdrawals = vec![];
for _ in 0..self.max_withdrawal_txs_per_sharding_sub_report {
if let Some((tx, index)) = txs.pop_front() {
withdrawals.push((tx, index));
} else {
break;
}
}
if withdrawals.is_empty() {
break;
}
let (withdrawals, indexes) = withdrawals
.into_iter()
.map(|(tx, index)| (tx, index.clone()))
.unzip::<EncodedTransaction, usize, Vec<EncodedTransaction>, Vec<usize>>();
let report_start = accepted_withdrawal_sharding_sub_reports.len()
* self.max_withdrawal_txs_per_sharding_sub_report.clone();
let report_end = report_start + indexes.len();
let range = ReportRange {
total_count: total_tx_count.try_into().unwrap(),
sharding_range: CandidRange {
start: 0u32,
end: total_tx_count_in_sharding.try_into().unwrap(),
},
report_range: CandidRange {
start: report_start.try_into().unwrap(),
end: report_end.try_into().unwrap(),
},
};
let merkle_proof = txs_merkle_tree.proof(&indexes);
let accepted_withdrawal_sharding_sub_report =
AcceptedWithdrawalShardingSubReport {
merkle_proof: merkle_proof.to_bytes(),
range,
withdrawals,
withdrawal_indexes: indexes
.into_iter()
.map(|i| i.try_into().unwrap())
.collect(),
};
accepted_withdrawal_sharding_sub_reports
.push(accepted_withdrawal_sharding_sub_report);
}
}
if rejected_txs_mut.contains_key(&next_balance_vault_seq_id) {
let mut rejected_txs = rejected_txs_mut
.remove(&next_balance_vault_seq_id)
.unwrap()
.into_iter()
.collect::<VecDeque<_>>();
let total_rejected_tx_count_in_sharding = rejected_txs.len();
loop {
let mut rejected_withdrawals = vec![];
for _ in 0..self.max_withdrawal_txs_per_sharding_sub_report {
if let Some((tx, index)) = rejected_txs.pop_front() {
rejected_withdrawals.push((tx, index));
} else {
break;
}
}
if rejected_withdrawals.is_empty() {
break;
}
let (withdrawals, indexes) = rejected_withdrawals
.into_iter()
.map(|(tx, index)| (tx, index.clone()))
.unzip::<EncodedTransaction, usize, Vec<EncodedTransaction>, Vec<usize>>();
let report_start = rejected_withdrawal_sharding_sub_reports.len()
* self.max_withdrawal_txs_per_sharding_sub_report.clone();
let report_end = report_start + indexes.len();
let range = ReportRange {
total_count: total_rejected_tx_count.try_into().unwrap(),
sharding_range: CandidRange {
start: 0u32,
end: total_rejected_tx_count_in_sharding.try_into().unwrap(),
},
report_range: CandidRange {
start: report_start.try_into().unwrap(),
end: report_end.try_into().unwrap(),
},
};
let merkle_proof = rejected_txs_merkle_tree.proof(&indexes);
let rejected_withdrawal_sharding_sub_report =
RejectedWithdrawalShardingSubReport {
merkle_proof: merkle_proof.to_bytes(),
range,
withdrawals,
withdrawal_indexes: indexes
.into_iter()
.map(|i| i.try_into().unwrap())
.collect(),
};
rejected_withdrawal_sharding_sub_reports
.push(rejected_withdrawal_sharding_sub_report);
}
}
balance_vault_sharding_sub_reports.insert(
next_balance_vault_seq_id,
(
balance_sharding_sub_reports,
accepted_withdrawal_sharding_sub_reports,
rejected_withdrawal_sharding_sub_reports,
),
);
next_balance_vault_seq_id += 1;
}
balance_vault_sharding_sub_reports
}
pub(crate) fn generate_balance_vault_data_integrity(
&self,
current_block_height: &BlockHeight,
balance_sharding_sub_reports: &BTreeMap<
BalanceVaultSeqId,
(
Vec<CandidBalanceShardingSubReport>,
Vec<AcceptedWithdrawalShardingSubReport>,
Vec<RejectedWithdrawalShardingSubReport>,
),
>,
snapshot_cursor: &SnapshotCursor,
snapshot_vault_index: &SnapshotVaultIndex,
) -> (
BTreeMap<BalanceVaultSeqId, MerkleNode>,
Vec<ActivatedBalanceVault>,
) {
let mut balance_vault_data_integrity = BTreeMap::new();
let mut new_activated_balance_vaults = vec![];
balance_sharding_sub_reports.iter().for_each(
|(
balance_vault_seq_id,
(balance_reports, accepted_withdrawal_reports, rejected_withdrawal_reports),
)| {
let mut pre_block_height_processed: Option<BlockHeight> = None;
if balance_vault_seq_id <= &snapshot_cursor.max_activated_balance_vault_seq_id
&& *current_block_height > BlockHeight::zero()
{
pre_block_height_processed = Some(
current_block_height
.checked_sub(&BlockHeight::one())
.unwrap(),
);
}
let total_vaults_count = snapshot_vault_index.get_total_vaults_count();
let total_merkle_node_count = snapshot_vault_index.get_total_merkle_node_count();
let balance_vault_index =
snapshot_vault_index.get_balance_vault_index(balance_vault_seq_id);
let mut data_buffer = vec![];
data_buffer.extend(
serialize(&pre_block_height_processed).expect("serialize should not fail"),
);
data_buffer.extend(current_block_height.0.to_bytes_le());
data_buffer.extend(balance_vault_seq_id.to_le_bytes());
data_buffer.extend(total_merkle_node_count.to_le_bytes());
data_buffer.extend(balance_vault_index.to_le_bytes());
let wallet_count_balance_changed: u32 = balance_reports
.iter()
.map(|report| report.balances_changed.0.len())
.sum::<usize>()
.try_into()
.unwrap();
data_buffer.extend(wallet_count_balance_changed.to_le_bytes());
let count_accepted_withdrawal: u32 = accepted_withdrawal_reports
.iter()
.map(|report| report.withdrawals.len())
.sum::<usize>()
.try_into()
.unwrap();
data_buffer.extend(count_accepted_withdrawal.to_le_bytes());
let count_rejected_withdrawal: u32 = rejected_withdrawal_reports
.iter()
.map(|report| report.withdrawals.len())
.sum::<usize>()
.try_into()
.unwrap();
data_buffer.extend(count_rejected_withdrawal.to_le_bytes());
balance_reports
.iter()
.for_each(|sub_report| data_buffer.extend(&sub_report.merkle_proof));
accepted_withdrawal_reports
.iter()
.for_each(|report| data_buffer.extend(&report.merkle_proof));
rejected_withdrawal_reports.iter().for_each(|report| {
data_buffer.extend(&report.merkle_proof);
});
let data_integrity: MerkleNode = sha256(data_buffer);
balance_vault_data_integrity.insert(balance_vault_seq_id.clone(), data_integrity);
if pre_block_height_processed.is_none() {
new_activated_balance_vaults.push(ActivatedBalanceVault {
canister_seq_id: balance_vault_seq_id.clone(),
canister_total_count: total_vaults_count,
canister_index: balance_vault_index,
hash_of_sharding: data_integrity,
});
}
},
);
(balance_vault_data_integrity, new_activated_balance_vaults)
}
pub fn get_balance_vault_seq_id_by_wallet_id(
&self,
wallet_id: &WalletRegisterId,
) -> BalanceVaultSeqId {
let wallet_id = wallet_id
.checked_sub(&WalletRegisterId::from(self.start_wallet_id.clone()))
.unwrap();
(wallet_id.0 / self.max_wallets_per_balance_vault_canister.clone())
.to_u64()
.unwrap()
}
pub(crate) fn build_balances_changed_merkle_tree(
&self,
balances_changed: &Vec<(WalletRegisterId, Vec<(AssetId, BalanceChanged)>)>,
) -> MerkleTree<Sha256> {
let nodes: Vec<MerkleNode> = balances_changed
.iter()
.map(|item| sha256(&bincode::serialize(item).unwrap()))
.collect();
MerkleTree::<Sha256>::from_leaves(&nodes)
}
}