use std::collections::HashMap;
use indexmap::IndexMap;
use polars::prelude::*;
use serde_json::{Map, Value as Json};
use crate::error::{Error, Result};
use crate::filters::{axis_row_filter, filter_exprs, focus_exprs};
use crate::manifest::FrameDataset;
use crate::metric::{combines, metric_plans, stat_exprs, MetricPlan};
use crate::output::{fingerprint, TreeNode};
use crate::source::Source;
pub const OTHER: &str = "__other__";
const MAX_TOP_K: i64 = 100;
const MAX_DEPTH: i64 = 12;
pub struct TreemapArgs {
pub axis: String,
pub focus: Vec<Json>, pub filters: Map<String, Json>,
pub size_by: Option<String>,
pub depth: i64,
pub top_k: i64,
}
impl TreemapArgs {
pub fn new(axis: impl Into<String>) -> Self {
TreemapArgs {
axis: axis.into(),
focus: vec![],
filters: Map::new(),
size_by: None,
depth: 2,
top_k: 12,
}
}
}
struct Built {
name: String,
key: Vec<AnyValue<'static>>,
measures: IndexMap<String, Json>,
is_other: bool,
n_folded: i64,
has_more: bool,
}
pub fn treemap(ds: &FrameDataset, source: &dyn Source, args: &TreemapArgs) -> Result<TreeNode> {
let columns = source.columns()?;
let axis = ds
.axis(&args.axis)
.ok_or_else(|| Error::Schema(format!("unknown axis {:?}", args.axis)))?;
let is_path = axis.path.is_some();
let size_by = ds
.resolve_size_by(args.size_by.as_deref())
.ok_or_else(|| Error::Schema("no size_by / metrics".into()))?;
if let Some(m) = ds.metrics.iter().find(|m| m.id == size_by) {
if m.is_ratio() {
return Err(Error::Schema(format!(
"cannot size a treemap by ratio metric {size_by:?} (ratios aren't additive areas)"
)));
}
}
let plans: Vec<MetricPlan> = metric_plans(&ds.metrics, &columns)?;
let size_plan = plans
.iter()
.find(|p| p.id == size_by)
.ok_or_else(|| Error::Schema(format!("size_by {size_by:?} not a metric")))?;
let stats = stat_exprs(&plans);
let combos = combines(&plans);
let mut base = source.frame()?;
for e in filter_exprs(ds, &args.filters) {
base = base.filter(e);
}
if let Some(pred) = axis_row_filter(axis, &columns)? {
base = base.filter(pred);
}
let masks = crate::metric::entity_mask_exprs(&ds.metrics, &ds.id_column, &columns)?;
let base = if masks.is_empty() {
base
} else {
base.with_columns(masks)
};
let (levels, base) = crate::path::resolved_levels(axis, base)?;
let levels = &levels;
let top_k = args.top_k.clamp(1, MAX_TOP_K) as usize;
let mut depth = args.depth.clamp(1, MAX_DEPTH);
let f = args.focus.len();
depth = depth.min(levels.len() as i64 - f as i64);
let mut base = base;
for e in focus_exprs(levels, &args.focus) {
base = base.filter(e);
}
let totals = base.clone().select(stats.clone()).collect()?;
let mut root = TreeNode {
name: "root".into(),
measures: finalize_row(&plans, &totals, 0),
children: vec![],
is_other: false,
n_folded: 0,
has_more: false,
};
if depth <= 0 {
return Ok(root);
}
let mut by_parent: HashMap<String, Vec<Built>> = HashMap::new();
let mut kept_parents: Option<DataFrame> = None;
for l in (f + 1)..=(f + depth as usize) {
let level_cols: Vec<String> = levels[..l].to_vec();
let parent_cols: Vec<String> = levels[..l - 1].to_vec();
let boundary_has_more = l == (f + depth as usize) && l < levels.len();
let mut lf = base.clone();
if is_path {
lf = lf.filter(col(level_cols[l - 1].as_str()).is_not_null());
}
if let Some(kp) = &kept_parents {
let keys: Vec<Expr> = parent_cols.iter().map(|c| col(c.as_str())).collect();
lf = lf.join(
kp.clone().lazy(),
keys.clone(),
keys,
JoinArgs::new(JoinType::Semi),
);
}
let group: Vec<Expr> = level_cols.iter().map(|c| col(c.as_str())).collect();
let mut agg_exprs = stats.clone();
let has_more_col = is_path && l < levels.len();
if has_more_col {
agg_exprs.push(
col(levels[l].as_str())
.is_not_null()
.max()
.alias("_has_more"),
);
}
let agg = lf
.group_by(group)
.agg(agg_exprs)
.with_column(size_plan.rank_expr.clone().alias("_rankval"));
let mut sort_by: Vec<Expr> = parent_cols.iter().map(|c| col(c.as_str())).collect();
sort_by.push(col("_rankval"));
sort_by.push(col(level_cols[l - 1].as_str()));
let mut desc = vec![false; parent_cols.len()];
desc.push(true); desc.push(false); let sorted = agg
.sort_by_exprs(
sort_by,
SortMultipleOptions::new()
.with_order_descending_multi(desc)
.with_nulls_last(true),
)
.collect()?;
let h = sorted.height();
let mut rn = Vec::with_capacity(h);
let mut last_fp: Option<String> = None;
let mut counter: u32 = 0;
for i in 0..h {
let pfp = fingerprint(&row_key(&sorted, &parent_cols, i)?);
if last_fp.as_ref() != Some(&pfp) {
counter = 0;
last_fp = Some(pfp);
}
counter += 1;
rn.push(counter);
}
let mut sorted = sorted;
let rn_i64: Vec<i64> = rn.iter().map(|v| *v as i64).collect();
sorted.with_column(Series::new("_rn".into(), rn_i64))?;
let kept_mask: BooleanChunked = sorted.column("_rn")?.i64()?.lt_eq(top_k as i64);
let tail_mask = !&kept_mask;
let kept = sorted.filter(&kept_mask)?;
let tail = sorted.filter(&tail_mask)?;
for i in 0..kept.height() {
let key = row_key(&kept, &level_cols, i)?;
let parent_fp = fingerprint(&key[..key.len() - 1]);
let name = av_to_label_static(&key[key.len() - 1]);
let has_more = if is_path {
has_more_col
&& kept
.column("_has_more")
.ok()
.and_then(|c| c.get(i).ok())
.map(|av| matches!(av, AnyValue::Boolean(true)))
.unwrap_or(false)
} else {
boundary_has_more
};
by_parent.entry(parent_fp).or_default().push(Built {
name,
key: key.clone(),
measures: finalize_row(&plans, &kept, i),
is_other: false,
n_folded: 0,
has_more,
});
}
if tail.height() > 0 {
let other_exprs: Vec<Expr> = combos
.iter()
.map(|(a, c)| MetricPlan::other_expr(a, *c))
.collect();
let pgroup: Vec<Expr> = parent_cols.iter().map(|c| col(c.as_str())).collect();
let other = if parent_cols.is_empty() {
tail.clone().lazy().select(other_exprs).collect()?
} else {
let mut agg = other_exprs;
agg.push(len().alias("__nf"));
tail.clone().lazy().group_by(pgroup).agg(agg).collect()?
};
for i in 0..other.height() {
let parent_key = row_key(&other, &parent_cols, i)?;
let parent_fp = fingerprint(&parent_key);
let mut key = parent_key.clone();
key.push(AnyValue::Null);
let n_folded = if parent_cols.is_empty() {
tail.height() as i64
} else {
other
.column("__nf")
.ok()
.and_then(|c| c.get(i).ok())
.map(|av| crate::output::av_to_json(&av))
.and_then(|j| j.as_i64())
.unwrap_or(1)
};
by_parent.entry(parent_fp).or_default().push(Built {
name: OTHER.to_string(),
key,
measures: finalize_row(&plans, &other, i),
is_other: true,
n_folded,
has_more: false,
});
}
}
kept_parents =
Some(kept.select(level_cols.iter().map(|s| s.as_str()).collect::<Vec<_>>())?);
}
attach(
&mut root,
&fingerprint(&focus_key(&args.focus)),
&mut by_parent,
);
Ok(root)
}
pub struct BranchSet {
pub keep: Vec<String>,
pub has_other: bool,
}
pub fn branch_set(ds: &FrameDataset, source: &dyn Source, args: &TreemapArgs) -> Result<BranchSet> {
let mut a = TreemapArgs::new(args.axis.clone());
a.focus = args.focus.clone();
a.filters = args.filters.clone();
a.size_by = args.size_by.clone();
a.top_k = args.top_k;
a.depth = 1;
let tree = treemap(ds, source, &a)?;
let mut keep = Vec::new();
let mut has_other = false;
for c in &tree.children {
if c.is_other {
has_other = true;
} else {
keep.push(c.name.clone());
}
}
Ok(BranchSet { keep, has_other })
}
fn attach(node: &mut TreeNode, key_fp: &str, by_parent: &mut HashMap<String, Vec<Built>>) {
let built = by_parent.remove(key_fp).unwrap_or_default();
for b in built {
let child_fp = fingerprint(&b.key);
let mut child = TreeNode {
name: b.name,
measures: b.measures,
children: vec![],
is_other: b.is_other,
n_folded: b.n_folded,
has_more: b.has_more,
};
if !b.is_other {
attach(&mut child, &child_fp, by_parent);
}
node.children.push(child);
}
}
fn finalize_row(plans: &[MetricPlan], df: &DataFrame, i: usize) -> IndexMap<String, Json> {
let get = |alias: &str| -> AnyValue<'static> {
df.column(alias)
.ok()
.and_then(|c| c.get(i).ok())
.map(|av| av.into_static())
.unwrap_or(AnyValue::Null)
};
plans
.iter()
.map(|p| (p.id.clone(), p.finalize_row(&get)))
.collect()
}
fn row_key(df: &DataFrame, cols: &[String], i: usize) -> Result<Vec<AnyValue<'static>>> {
cols.iter()
.map(|c| Ok(df.column(c.as_str())?.get(i)?.into_static()))
.collect()
}
fn focus_key(focus: &[Json]) -> Vec<AnyValue<'static>> {
focus.iter().map(json_to_av).collect()
}
fn json_to_av(v: &Json) -> AnyValue<'static> {
match v {
Json::Null => AnyValue::Null,
Json::Bool(b) => AnyValue::Boolean(*b),
Json::Number(n) => {
if let Some(i) = n.as_i64() {
AnyValue::Int64(i)
} else {
AnyValue::Float64(n.as_f64().unwrap_or(0.0))
}
}
Json::String(s) => AnyValue::StringOwned(s.as_str().into()),
other => AnyValue::StringOwned(other.to_string().into()),
}
}
fn av_to_label_static(av: &AnyValue<'static>) -> String {
crate::output::av_to_label(av)
}