use std::time::Instant;
use brk_error::Result;
use brk_types::{EmptyAddrData, FundedAddrData, Height};
use rayon::prelude::*;
use tracing::info;
use vecdb::{AnyStoredVec, AnyVec, Stamp, VecIndex, WritableVec};
use crate::distribution::{
Vecs,
block::{WithAddrDataSource, process_empty_addrs, process_funded_addrs},
state::BlockState,
};
use super::super::addr::{AddrTypeToTypeIndexMap, AddrsDataVecs, AnyAddrIndexesVecs};
pub(crate) fn process_addr_updates(
addrs_data: &mut AddrsDataVecs,
addr_indexes: &mut AnyAddrIndexesVecs,
empty_updates: AddrTypeToTypeIndexMap<WithAddrDataSource<EmptyAddrData>>,
funded_updates: AddrTypeToTypeIndexMap<WithAddrDataSource<FundedAddrData>>,
) -> Result<()> {
info!("Processing addr updates...");
let i = Instant::now();
let empty_result = process_empty_addrs(addrs_data, empty_updates)?;
let funded_result = process_funded_addrs(addrs_data, funded_updates)?;
addr_indexes.par_batch_update(empty_result, funded_result)?;
info!("Processed addr updates in {:?}", i.elapsed());
Ok(())
}
pub(crate) fn write(
vecs: &mut Vecs,
height: Height,
chain_state: &[BlockState],
min_supply_modified: Option<Height>,
with_changes: bool,
) -> Result<()> {
info!("Writing to disk...");
let i = Instant::now();
let stamp = Stamp::from(height);
let supply_state_len = vecs.supply_state.len();
let truncate_to =
min_supply_modified.map_or(supply_state_len, |h| h.to_usize().min(supply_state_len));
vecs.supply_state
.truncate_if_needed(Height::from(truncate_to))?;
for block_state in &chain_state[truncate_to..] {
vecs.supply_state.push(block_state.supply);
}
vecs.any_addr_indexes
.par_iter_mut()
.chain(vecs.addrs_data.par_iter_mut())
.chain(vecs.addrs.funded.par_iter_height_mut())
.chain(vecs.addrs.empty.par_iter_height_mut())
.chain(vecs.addrs.activity.par_iter_height_mut())
.chain(
[
&mut vecs.supply_state as &mut dyn AnyStoredVec,
&mut vecs.coinblocks_destroyed.block,
]
.into_par_iter(),
)
.chain(vecs.utxo_cohorts.par_iter_vecs_mut())
.chain(vecs.addr_cohorts.par_iter_vecs_mut())
.try_for_each(|v| v.any_stamped_write_maybe_with_changes(stamp, with_changes))?;
let cleanup = with_changes;
vecs.utxo_cohorts.commit_all_states(height, cleanup)?;
vecs.addr_cohorts.commit_all_states(height, cleanup)?;
info!("Wrote in {:?}", i.elapsed());
Ok(())
}