use std::collections::VecDeque;
use brk_error::Result;
use brk_traversable::Traversable;
use brk_types::{Height, get_percentile};
use derive_more::{Deref, DerefMut};
use schemars::JsonSchema;
use vecdb::{
AnyStoredVec, AnyVec, CheckedSub, Database, Exit, ReadableVec, Rw, StorageMode, VecIndex,
VecValue, Version, WritableVec,
};
use crate::{
indexes,
internal::{ComputedVecValue, DistributionStats, NumericValue, PerBlock},
};
#[derive(Deref, DerefMut, Traversable)]
#[traversable(transparent)]
pub struct PerBlockDistribution<T: ComputedVecValue + PartialOrd + JsonSchema, M: StorageMode = Rw>(
pub DistributionStats<PerBlock<T, M>>,
);
impl<T: NumericValue + JsonSchema> PerBlockDistribution<T> {
pub(crate) fn forced_import(
db: &Database,
name: &str,
version: Version,
indexes: &indexes::Vecs,
) -> Result<Self> {
Ok(Self(DistributionStats::try_from_fn(|suffix| {
PerBlock::forced_import(db, &format!("{name}_{suffix}"), version, indexes)
})?))
}
pub(crate) fn compute_with_skip<A>(
&mut self,
max_from: Height,
source: &impl ReadableVec<A, T>,
first_indexes: &impl ReadableVec<Height, A>,
count_indexes: &impl ReadableVec<Height, brk_types::StoredU64>,
exit: &Exit,
skip_count: usize,
) -> Result<()>
where
A: VecIndex + VecValue + brk_types::CheckedSub<A>,
{
let DistributionStats {
min,
max,
pct10,
pct25,
median,
pct75,
pct90,
} = &mut self.0;
let min = &mut min.height;
let max = &mut max.height;
let pct10 = &mut pct10.height;
let pct25 = &mut pct25.height;
let median = &mut median.height;
let pct75 = &mut pct75.height;
let pct90 = &mut pct90.height;
let combined_version = source.version() + first_indexes.version() + count_indexes.version();
let mut index = max_from;
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.validate_computed_version_or_reset(combined_version)?;
index = index.min(Height::from(vec.len()));
}
let start = index.to_usize();
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.truncate_if_needed_at(start)?;
}
let fi_len = first_indexes.len();
let first_indexes_batch: Vec<A> = first_indexes.collect_range_at(start, fi_len);
let count_indexes_batch: Vec<brk_types::StoredU64> =
count_indexes.collect_range_at(start, fi_len);
let mut values: Vec<T> = Vec::new();
first_indexes_batch
.into_iter()
.zip(count_indexes_batch)
.try_for_each(|(first_index, count_index)| -> Result<()> {
let count = u64::from(count_index) as usize;
let effective_count = count.saturating_sub(skip_count);
let effective_first_index = first_index + skip_count.min(count);
source.collect_range_into_at(
effective_first_index.to_usize(),
effective_first_index.to_usize() + effective_count,
&mut values,
);
if values.is_empty() {
let zero = T::from(0_usize);
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.push(zero);
}
} else {
values.sort_unstable();
max.push(*values.last().unwrap());
pct90.push(get_percentile(&values, 0.90));
pct75.push(get_percentile(&values, 0.75));
median.push(get_percentile(&values, 0.50));
pct25.push(get_percentile(&values, 0.25));
pct10.push(get_percentile(&values, 0.10));
min.push(*values.first().unwrap());
}
Ok(())
})?;
let _lock = exit.lock();
for vec in [min, max, median, pct10, pct25, pct75, pct90] {
vec.write()?;
}
Ok(())
}
pub(crate) fn compute_from_nblocks<A>(
&mut self,
max_from: Height,
source: &(impl ReadableVec<A, T> + Sized),
first_indexes: &impl ReadableVec<Height, A>,
count_indexes: &impl ReadableVec<Height, brk_types::StoredU64>,
n_blocks: usize,
exit: &Exit,
) -> Result<()>
where
T: CheckedSub,
A: VecIndex + VecValue + brk_types::CheckedSub<A>,
{
let DistributionStats {
min,
max,
pct10,
pct25,
median,
pct75,
pct90,
} = &mut self.0;
let min = &mut min.height;
let max = &mut max.height;
let pct10 = &mut pct10.height;
let pct25 = &mut pct25.height;
let median = &mut median.height;
let pct75 = &mut pct75.height;
let pct90 = &mut pct90.height;
let combined_version = source.version() + first_indexes.version() + count_indexes.version();
let mut index = max_from;
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.validate_computed_version_or_reset(combined_version)?;
index = index.min(Height::from(vec.len()));
}
let start = index.to_usize();
let fi_len = first_indexes.len();
let batch_start = start.saturating_sub(n_blocks - 1);
let first_indexes_batch: Vec<A> = first_indexes.collect_range_at(batch_start, fi_len);
let count_indexes_all: Vec<brk_types::StoredU64> =
count_indexes.collect_range_at(batch_start, fi_len);
let zero = T::from(0_usize);
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.truncate_if_needed_at(start)?;
}
let mut block_ring: VecDeque<Vec<T>> = VecDeque::with_capacity(n_blocks + 1);
let mut cursor = source.cursor();
let mut sorted_window: Vec<T> = Vec::new();
let mut merge_buf: Vec<T> = Vec::new();
let window_start_of_first = start.saturating_sub(n_blocks - 1);
for block_idx in window_start_of_first..start {
let fi = first_indexes_batch[block_idx - batch_start].to_usize();
let count = u64::from(count_indexes_all[block_idx - batch_start]) as usize;
if cursor.position() < fi {
cursor.advance(fi - cursor.position());
}
let mut bv = Vec::with_capacity(count);
cursor.for_each(count, |v: T| bv.push(v));
bv.sort_unstable();
sorted_window.extend_from_slice(&bv);
block_ring.push_back(bv);
}
sorted_window.sort();
for j in 0..(fi_len - start) {
let idx = start + j;
let fi = first_indexes_batch[idx - batch_start].to_usize();
let count = u64::from(count_indexes_all[idx - batch_start]) as usize;
if cursor.position() < fi {
cursor.advance(fi - cursor.position());
}
let mut new_block = Vec::with_capacity(count);
cursor.for_each(count, |v: T| new_block.push(v));
new_block.sort_unstable();
merge_buf.clear();
merge_buf.reserve(sorted_window.len() + new_block.len());
let (mut si, mut ni) = (0, 0);
while si < sorted_window.len() && ni < new_block.len() {
if sorted_window[si] <= new_block[ni] {
merge_buf.push(sorted_window[si]);
si += 1;
} else {
merge_buf.push(new_block[ni]);
ni += 1;
}
}
merge_buf.extend_from_slice(&sorted_window[si..]);
merge_buf.extend_from_slice(&new_block[ni..]);
std::mem::swap(&mut sorted_window, &mut merge_buf);
block_ring.push_back(new_block);
if block_ring.len() > n_blocks {
let expired = block_ring.pop_front().unwrap();
merge_buf.clear();
merge_buf.reserve(sorted_window.len());
let mut ei = 0;
for &v in &sorted_window {
if ei < expired.len() && v == expired[ei] {
ei += 1;
} else {
merge_buf.push(v);
}
}
std::mem::swap(&mut sorted_window, &mut merge_buf);
}
if sorted_window.is_empty() {
for vec in [
&mut *min,
&mut *max,
&mut *median,
&mut *pct10,
&mut *pct25,
&mut *pct75,
&mut *pct90,
] {
vec.push(zero);
}
} else {
max.push(*sorted_window.last().unwrap());
pct90.push(get_percentile(&sorted_window, 0.90));
pct75.push(get_percentile(&sorted_window, 0.75));
median.push(get_percentile(&sorted_window, 0.50));
pct25.push(get_percentile(&sorted_window, 0.25));
pct10.push(get_percentile(&sorted_window, 0.10));
min.push(*sorted_window.first().unwrap());
}
}
let _lock = exit.lock();
for vec in [min, max, median, pct10, pct25, pct75, pct90] {
vec.write()?;
}
Ok(())
}
}