use std::path::Path;
use brk_cohort::{CohortContext, Filter, Filtered};
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{BasisPointsSigned32, Cents, Height, Indexes, StoredI64, StoredU64, Version};
use rayon::prelude::*;
use vecdb::{AnyStoredVec, AnyVec, Database, Exit, ReadableVec, Rw, StorageMode, WritableVec};
use crate::{
distribution::state::{AddrCohortState, MinimalRealizedState},
indexes,
internal::{CachedWindowStarts, PerBlockWithDeltas},
prices,
};
use crate::distribution::metrics::{ImportConfig, MinimalCohortMetrics};
use super::super::traits::{CohortVecs, DynCohortVecs};
#[derive(Traversable)]
pub struct AddrCohortVecs<M: StorageMode = Rw> {
starting_height: Option<Height>,
#[traversable(skip)]
pub state: Option<Box<AddrCohortState<MinimalRealizedState>>>,
#[traversable(flatten)]
pub metrics: MinimalCohortMetrics<M>,
pub addr_count: PerBlockWithDeltas<StoredU64, StoredI64, BasisPointsSigned32, M>,
}
impl AddrCohortVecs {
pub(crate) fn forced_import(
db: &Database,
filter: Filter,
name: &str,
version: Version,
indexes: &indexes::Vecs,
states_path: Option<&Path>,
cached_starts: &CachedWindowStarts,
) -> Result<Self> {
let full_name = CohortContext::Addr.full_name(&filter, name);
let cfg = ImportConfig {
db,
filter: &filter,
full_name: &full_name,
version,
indexes,
cached_starts,
};
let addr_count = PerBlockWithDeltas::forced_import(
db,
&cfg.name("addr_count"),
version,
Version::ONE,
indexes,
cached_starts,
)?;
Ok(Self {
starting_height: None,
state: states_path.map(|path| Box::new(AddrCohortState::new(path, &full_name))),
metrics: MinimalCohortMetrics::forced_import(&cfg)?,
addr_count,
})
}
pub(crate) fn reset_starting_height(&mut self) {
self.starting_height = Some(Height::ZERO);
}
pub(crate) fn par_iter_vecs_mut(
&mut self,
) -> impl ParallelIterator<Item = &mut dyn AnyStoredVec> {
let mut vecs: Vec<&mut dyn AnyStoredVec> = Vec::new();
vecs.push(&mut self.addr_count.height as &mut dyn AnyStoredVec);
vecs.extend(self.metrics.collect_all_vecs_mut());
vecs.into_par_iter()
}
pub(crate) fn write_state(&mut self, height: Height, cleanup: bool) -> Result<()> {
if let Some(state) = self.state.as_mut() {
state.inner.write(height, cleanup)?;
}
Ok(())
}
}
impl Filtered for AddrCohortVecs {
fn filter(&self) -> &Filter {
&self.metrics.filter
}
}
impl DynCohortVecs for AddrCohortVecs {
fn min_stateful_len(&self) -> usize {
self.addr_count
.height
.len()
.min(self.metrics.min_stateful_len())
}
fn reset_state_starting_height(&mut self) {
self.reset_starting_height();
if let Some(state) = self.state.as_mut() {
state.reset();
}
}
fn import_state(&mut self, starting_height: Height) -> Result<Height> {
if let Some(state) = self.state.as_mut() {
if let Some(mut prev_height) = starting_height.decremented() {
prev_height = state.inner.import_at_or_before(prev_height)?;
state.inner.supply.value = self
.metrics
.supply
.total
.sats
.height
.collect_one(prev_height)
.unwrap();
state.inner.supply.utxo_count = *self
.metrics
.outputs
.unspent_count
.height
.collect_one(prev_height)
.unwrap();
state.addr_count = *self.addr_count.height.collect_one(prev_height).unwrap();
state.inner.restore_realized_cap();
let result = prev_height.incremented();
self.starting_height = Some(result);
Ok(result)
} else {
self.starting_height = Some(Height::ZERO);
Ok(Height::ZERO)
}
} else {
self.starting_height = Some(starting_height);
Ok(starting_height)
}
}
fn validate_computed_versions(&mut self, base_version: Version) -> Result<()> {
use vecdb::WritableVec;
self.addr_count
.height
.validate_computed_version_or_reset(base_version)?;
Ok(())
}
fn push_state(&mut self, height: Height) {
if self.starting_height.is_some_and(|h| h > height) {
return;
}
if let Some(state) = self.state.as_ref() {
self.addr_count.height.push(state.addr_count.into());
self.metrics.supply.push_state(&state.inner);
self.metrics.outputs.push_state(&state.inner);
self.metrics.activity.push_state(&state.inner);
self.metrics.realized.push_state(&state.inner);
}
}
fn push_unrealized_state(&mut self, _height_price: Cents) {}
fn compute_rest_part1(
&mut self,
prices: &prices::Vecs,
starting_indexes: &Indexes,
exit: &Exit,
) -> Result<()> {
self.metrics
.compute_rest_part1(prices, starting_indexes, exit)
}
fn write_state(&mut self, height: Height, cleanup: bool) -> Result<()> {
if let Some(state) = self.state.as_mut() {
state.inner.write(height, cleanup)?;
}
Ok(())
}
fn reset_cost_basis_data_if_needed(&mut self) -> Result<()> {
if let Some(state) = self.state.as_mut() {
state.inner.reset_cost_basis_data_if_needed()?;
}
Ok(())
}
fn reset_single_iteration_values(&mut self) {
if let Some(state) = self.state.as_mut() {
state.inner.reset_single_iteration_values();
}
}
}
impl CohortVecs for AddrCohortVecs {
fn compute_from_stateful(
&mut self,
starting_indexes: &Indexes,
others: &[&Self],
exit: &Exit,
) -> Result<()> {
self.addr_count.height.compute_sum_of_others(
starting_indexes.height,
others
.iter()
.map(|v| &v.addr_count.height)
.collect::<Vec<_>>()
.as_slice(),
exit,
)?;
self.metrics.compute_from_sources(
starting_indexes,
&others.iter().map(|v| &v.metrics).collect::<Vec<_>>(),
exit,
)?;
Ok(())
}
fn compute_rest_part2(
&mut self,
prices: &prices::Vecs,
starting_indexes: &Indexes,
all_utxo_count: &impl ReadableVec<Height, StoredU64>,
exit: &Exit,
) -> Result<()> {
self.metrics
.compute_rest_part2(prices, starting_indexes, all_utxo_count, exit)
}
}