use std::{cmp::Reverse, collections::BinaryHeap, fs, path::Path};
use brk_cohort::{Filtered, PROFITABILITY_RANGE_COUNT, TERM_NAMES};
use brk_error::Result;
use brk_types::{BasisPoints16, Cents, CentsCompact, CostBasisDistribution, Date, Dollars, Sats};
use crate::distribution::metrics::{CostBasis, ProfitabilityMetrics};
use super::{
fenwick::{PercentileResult, ProfitabilityRangeResult},
groups::UTXOCohorts,
};
use super::COST_BASIS_PRICE_DIGITS;
impl UTXOCohorts {
pub(crate) fn push_aggregate_percentiles(
&mut self,
spot_price: Cents,
date_opt: Option<Date>,
states_path: &Path,
) -> Result<()> {
if self.caches.fenwick.is_initialized() {
self.push_fenwick_results(spot_price);
}
if let Some(date) = date_opt {
self.write_disk_distributions(date, states_path)?;
}
Ok(())
}
fn push_fenwick_results(&mut self, spot_price: Cents) {
let (all_d, sth_d, lth_d) = self.caches.fenwick.density(spot_price);
let all = self.caches.fenwick.percentiles_all();
push_cost_basis(&all, all_d, &mut self.all.metrics.cost_basis);
let sth = self.caches.fenwick.percentiles_sth();
push_cost_basis(&sth, sth_d, &mut self.sth.metrics.cost_basis);
let lth = self.caches.fenwick.percentiles_lth();
push_cost_basis(<h, lth_d, &mut self.lth.metrics.cost_basis);
let prof = self.caches.fenwick.profitability(spot_price);
push_profitability(&prof, &mut self.profitability);
}
fn write_disk_distributions(&mut self, date: Date, states_path: &Path) -> Result<()> {
let sth_filter = self.sth.metrics.filter.clone();
let maps: Vec<_> = self
.age_range
.iter()
.filter_map(|sub| {
let state = sub.state.as_ref()?;
let map = state.cost_basis_map();
if map.is_empty() {
return None;
}
let is_sth = sth_filter.includes(sub.filter());
Some((map, is_sth))
})
.collect();
if maps.is_empty() {
return Ok(());
}
let cap = maps.iter().map(|(m, _)| m.len()).max().unwrap_or(0);
let mut targets = AllSthLth {
all: MergeTarget::new(cap),
sth: MergeTarget::new(cap),
lth: MergeTarget::new(cap),
};
merge_k_way(&maps, &mut targets);
write_distribution(states_path, "all", date, targets.all.merged)?;
write_distribution(states_path, TERM_NAMES.short.id, date, targets.sth.merged)?;
write_distribution(states_path, TERM_NAMES.long.id, date, targets.lth.merged)?;
Ok(())
}
}
#[inline(always)]
fn push_cost_basis(percentiles: &PercentileResult, density_bps: u16, cost_basis: &mut CostBasis) {
cost_basis.push_minmax(percentiles.min_price, percentiles.max_price);
cost_basis.push_percentiles(&percentiles.sat_prices, &percentiles.usd_prices);
cost_basis.push_density(BasisPoints16::from(density_bps));
}
#[inline(always)]
fn raw_usd_to_dollars(raw: u128) -> Dollars {
Dollars::from(raw as f64 / 1e10)
}
fn push_profitability(
buckets: &[ProfitabilityRangeResult; PROFITABILITY_RANGE_COUNT],
metrics: &mut ProfitabilityMetrics,
) {
for (i, bucket) in metrics.range.as_array_mut().into_iter().enumerate() {
let r = &buckets[i];
bucket.push(
Sats::from(r.all_sats),
Sats::from(r.sth_sats),
raw_usd_to_dollars(r.all_usd),
raw_usd_to_dollars(r.sth_usd),
);
}
}
fn write_distribution(
states_path: &Path,
name: &str,
date: Date,
merged: Vec<(CentsCompact, Sats)>,
) -> Result<()> {
let dir = states_path.join(format!("utxo_{name}_cost_basis/by_date"));
fs::create_dir_all(&dir)?;
fs::write(
dir.join(date.to_string()),
CostBasisDistribution::serialize_iter(merged.into_iter())?,
)?;
Ok(())
}
struct AllSthLth<T> {
all: T,
sth: T,
lth: T,
}
impl<T> AllSthLth<T> {
fn term_mut(&mut self, is_sth: bool) -> &mut T {
if is_sth { &mut self.sth } else { &mut self.lth }
}
fn for_each_mut(&mut self, mut f: impl FnMut(&mut T)) {
f(&mut self.all);
f(&mut self.sth);
f(&mut self.lth);
}
}
struct MergeTarget {
price_sats: u64,
merged: Vec<(CentsCompact, Sats)>,
}
impl MergeTarget {
fn new(cap: usize) -> Self {
Self {
price_sats: 0,
merged: Vec::with_capacity(cap),
}
}
#[inline]
fn accumulate(&mut self, amount: u64) {
self.price_sats += amount;
}
fn finalize_price(&mut self, price: Cents) {
if self.price_sats > 0 {
let rounded: CentsCompact = price.round_to_dollar(COST_BASIS_PRICE_DIGITS).into();
if let Some((lp, ls)) = self.merged.last_mut()
&& *lp == rounded
{
*ls += Sats::from(self.price_sats);
} else {
self.merged.push((rounded, Sats::from(self.price_sats)));
}
}
self.price_sats = 0;
}
}
fn merge_k_way(
maps: &[(&std::collections::BTreeMap<CentsCompact, Sats>, bool)],
targets: &mut AllSthLth<MergeTarget>,
) {
let mut iters: Vec<_> = maps
.iter()
.map(|(map, is_sth)| (map.iter().peekable(), *is_sth))
.collect();
let mut heap: BinaryHeap<Reverse<(CentsCompact, usize)>> =
BinaryHeap::with_capacity(iters.len());
for (i, (iter, _)) in iters.iter_mut().enumerate() {
if let Some(&(&price, _)) = iter.peek() {
heap.push(Reverse((price, i)));
}
}
let mut current_price: Option<CentsCompact> = None;
while let Some(Reverse((price, ci))) = heap.pop() {
let (ref mut iter, is_sth) = iters[ci];
let (_, &sats) = iter.next().unwrap();
let amount = u64::from(sats);
if let Some(prev) = current_price
&& prev != price
{
targets.for_each_mut(|t| t.finalize_price(prev.into()));
}
current_price = Some(price);
targets.all.accumulate(amount);
targets.term_mut(is_sth).accumulate(amount);
if let Some(&(&next_price, _)) = iter.peek() {
heap.push(Reverse((next_price, ci)));
}
}
if let Some(price) = current_price {
targets.for_each_mut(|t| t.finalize_price(price.into()));
}
}