use polars::prelude::*;
use serde_json::{json, Map, Value as Json};
use crate::error::{Error, Result};
use crate::filters::filter_exprs;
use crate::manifest::FrameDataset;
use crate::metric::value_expr;
use crate::output::av_to_json;
use crate::source::Source;
const DEFAULT_SCATTER_LIMIT: u32 = 5000;
fn cell(df: &DataFrame, col: &str, i: usize) -> Json {
df.column(col)
.ok()
.and_then(|c| c.get(i).ok())
.map(|av| av_to_json(&av))
.unwrap_or(Json::Null)
}
fn token_match(colname: &str, tokens: &[String]) -> Expr {
let lc = || col(colname).cast(DataType::String).str().to_lowercase();
tokens
.iter()
.map(|t| lc().str().contains_literal(lit(t.clone())))
.reduce(|a, b| a.and(b))
.unwrap_or_else(|| lit(true))
}
pub fn scatter(
ds: &FrameDataset,
source: &dyn Source,
x: &str,
y: &str,
filters: &Map<String, Json>,
color: Option<&str>,
limit: Option<u32>,
) -> Result<Json> {
let columns = source.columns()?;
let xm = ds
.metric(x)
.ok_or_else(|| Error::Schema(format!("unknown metric {x:?}")))?;
let ym = ds
.metric(y)
.ok_or_else(|| Error::Schema(format!("unknown metric {y:?}")))?;
let xv = value_expr(xm, &columns)?;
let yv = value_expr(ym, &columns)?;
let color_col = color.map(str::to_string).unwrap_or_else(|| {
let a = &ds.axes[0];
a.levels
.first()
.cloned()
.or_else(|| a.path.as_ref().map(|p| p.column.clone()))
.unwrap_or_else(|| ds.id_column.clone())
});
let mut lf = source.frame()?;
for e in filter_exprs(ds, filters) {
lf = lf.filter(e);
}
let df = lf
.select([
col(ds.id_column.as_str()).alias("id"),
col(ds.label_column.as_str()).alias("label"),
xv.alias("x"),
yv.alias("y"),
col(color_col.as_str()).alias("color_key"),
])
.sort(
["x"],
SortMultipleOptions::new()
.with_order_descending(true)
.with_nulls_last(true),
)
.limit(limit.unwrap_or(DEFAULT_SCATTER_LIMIT))
.collect()?;
Ok(Json::Array(
(0..df.height())
.map(|i| {
json!({
"id": cell(&df, "id", i), "label": cell(&df, "label", i),
"x": cell(&df, "x", i), "y": cell(&df, "y", i),
"color_key": cell(&df, "color_key", i),
})
})
.collect(),
))
}
pub fn geo(
ds: &FrameDataset,
source: &dyn Source,
key_column: &str,
metric: &str,
filters: &Map<String, Json>,
) -> Result<Json> {
let columns = source.columns()?;
if !columns.contains(key_column) {
return Err(Error::Schema(format!("unknown key column {key_column:?}")));
}
let m = ds
.metric(metric)
.ok_or_else(|| Error::Schema(format!("unknown metric {metric:?}")))?;
let mut lf = source.frame()?;
for e in filter_exprs(ds, filters) {
lf = lf.filter(e);
}
let masks = crate::metric::entity_mask_exprs(&ds.metrics, &ds.id_column, &columns)?;
if !masks.is_empty() {
lf = lf.with_columns(masks);
}
let region_agg = |mm: &crate::manifest::Metric| -> Result<Expr> {
let val = || -> Result<Expr> {
if mm.is_entity() {
Ok(col(format!("{}__eff", mm.id).as_str()))
} else {
value_expr(mm, &columns)
}
};
Ok(match mm.cross_agg() {
"count" => col(key_column).count(),
"count_distinct" => val()?.n_unique(),
"mean" | "weighted_mean" => val()?.mean(),
"median" => val()?.median(),
"min" => val()?.min(),
"max" => val()?.max(),
_ => val()?.sum(),
})
};
if m.is_ratio() {
let resolve = |which: &str, id: &Option<String>| -> Result<&crate::manifest::Metric> {
let id = id
.as_deref()
.ok_or_else(|| Error::Schema(format!("ratio metric {:?} needs a {which}", m.id)))?;
ds.metric(id)
.ok_or_else(|| Error::Schema(format!("ratio {:?} {which} {id:?} unknown", m.id)))
};
let num = resolve("numerator", &m.numerator)?;
let den = resolve("denominator", &m.denominator)?;
let df = lf
.filter(col(key_column).is_not_null())
.group_by([col(key_column)])
.agg([region_agg(num)?.alias("__n"), region_agg(den)?.alias("__d")])
.collect()?;
return Ok(Json::Array(
(0..df.height())
.map(|i| {
let n = cell(&df, "__n", i).as_f64();
let d = cell(&df, "__d", i).as_f64();
let value = match (n, d) {
(Some(n), Some(d)) if d != 0.0 => serde_json::Number::from_f64(n / d)
.map(Json::Number)
.unwrap_or(Json::Null),
_ => Json::Null,
};
json!({ "key": cell(&df, key_column, i), "value": value })
})
.collect(),
));
}
let df = lf
.filter(col(key_column).is_not_null())
.group_by([col(key_column)])
.agg([region_agg(m)?.alias("value")])
.collect()?;
Ok(Json::Array(
(0..df.height())
.map(|i| json!({ "key": cell(&df, key_column, i), "value": cell(&df, "value", i) }))
.collect(),
))
}
pub fn detail(ds: &FrameDataset, source: &dyn Source, eid: &str) -> Result<Option<Json>> {
let df = source
.frame()?
.filter(col(ds.id_column.as_str()).eq(lit(eid)))
.limit(1)
.collect()?;
if df.height() == 0 {
return Ok(None);
}
let units: Map<String, Json> = ds
.metrics
.iter()
.filter_map(|m| {
m.column
.as_ref()
.map(|c| (c.clone(), Json::String(m.unit.clone())))
})
.collect();
let mut facts = vec![];
let mut metrics = vec![];
for c in df.get_column_names() {
let name = c.as_str();
if name == ds.id_column {
continue;
}
let v = cell(&df, name, 0);
if let Some(unit) = units.get(name) {
metrics.push(json!({"label": name, "value": v, "unit": unit}));
} else if name != ds.label_column {
facts.push(json!({"label": name, "value": v}));
}
}
if let Some(fields) = &ds.detail_fields {
let pick = |name: &str| -> Option<Json> {
df.get_column_names()
.iter()
.any(|c| c.as_str() == name)
.then(|| json!({"label": name, "value": cell(&df, name, 0)}))
};
facts = fields.iter().filter_map(|f| pick(f)).collect();
} else {
facts.truncate(10);
}
let label = match cell(&df, &ds.label_column, 0) {
Json::String(s) if !s.is_empty() => s,
_ => eid.to_string(),
};
Ok(Some(
json!({"label": label, "facts": facts, "metrics": metrics}),
))
}
pub fn search(
ds: &FrameDataset,
source: &dyn Source,
qstr: &str,
axis_id: Option<&str>,
limit: u32,
) -> Result<Json> {
let needle = qstr.trim().to_lowercase();
if needle.is_empty() {
return Ok(Json::Array(vec![]));
}
let axis = axis_id.and_then(|a| ds.axis(a)).or_else(|| ds.axes.first());
let axis = match axis {
Some(a) if !a.levels.is_empty() => a,
_ => return leaf_search(ds, source, &needle, limit),
};
let levels = &axis.levels;
let base = source.frame()?;
let cap = (limit as usize).max(1);
let tokens: Vec<String> = needle.split_whitespace().map(String::from).collect();
let mut rows: Vec<(bool, usize, String, Json)> = Vec::new();
for (l, colname) in levels.iter().enumerate() {
let is_leaf = l + 1 == levels.len();
let label = axis.level_label(l);
let matched = base
.clone()
.filter(col(colname.as_str()).is_not_null())
.filter(token_match(colname, &tokens));
let mut ancestors: Vec<Expr> = levels[..l]
.iter()
.map(|c| col(c.as_str()).first().alias(c.as_str()))
.collect();
if is_leaf {
ancestors.push(col(ds.id_column.as_str()).first().alias("__id"));
}
let df = if ancestors.is_empty() {
matched
.select([col(colname.as_str())])
.unique(None, UniqueKeepStrategy::First)
.limit(cap as u32 * 4)
.collect()?
} else {
matched
.group_by([col(colname.as_str())])
.agg(ancestors)
.limit(cap as u32 * 4)
.collect()?
};
for i in 0..df.height() {
let name = cell(&df, colname, i);
let name_s = match &name {
Json::String(s) => s.clone(),
v => v.to_string(),
};
let path: Vec<Json> = levels[..=l].iter().map(|c| cell(&df, c, i)).collect();
let id = if is_leaf {
cell(&df, "__id", i)
} else {
Json::Null
};
let item = json!({
"name": name, "level_label": label, "axis": axis.id,
"path": path, "is_leaf": is_leaf, "id": id,
});
let lower = name_s.to_lowercase();
rows.push((
!lower.starts_with(&needle),
name_s.chars().count(),
lower,
item,
));
}
}
rows.sort_by(|a, b| {
a.0.cmp(&b.0)
.then(a.1.cmp(&b.1))
.then_with(|| a.2.cmp(&b.2))
});
rows.truncate(cap);
Ok(Json::Array(
rows.into_iter().map(|(_, _, _, it)| it).collect(),
))
}
fn leaf_search(ds: &FrameDataset, source: &dyn Source, needle: &str, limit: u32) -> Result<Json> {
let tokens: Vec<String> = needle.split_whitespace().map(String::from).collect();
let df = source
.frame()?
.filter(token_match(&ds.label_column, &tokens))
.select([
col(ds.id_column.as_str()).alias("id"),
col(ds.label_column.as_str()).alias("label"),
])
.limit(limit)
.collect()?;
Ok(Json::Array(
(0..df.height())
.map(|i| {
let label = cell(&df, "label", i);
json!({
"name": label, "level_label": Json::Null, "axis": Json::Null,
"path": [cell(&df, "label", i)], "is_leaf": true, "id": cell(&df, "id", i),
})
})
.collect(),
))
}
pub fn filter_options(
ds: &FrameDataset,
source: &dyn Source,
facet: &str,
q: Option<&str>,
limit: u32,
) -> Result<Vec<Json>> {
let f = match ds.filters.iter().find(|x| x.id == facet) {
Some(f) => f,
None => return Ok(vec![]),
};
if f.r#type == "tags" {
return Ok(match ds.tag_indices.get(facet) {
Some(idx) => idx
.options(q, limit as usize)
.into_iter()
.map(Json::String)
.collect(),
None => vec![],
});
}
let mut lf = source.frame()?.select([col(f.column.as_str()).alias("v")]);
lf = match q {
Some(q) => lf.filter(
col("v")
.str()
.to_lowercase()
.str()
.contains_literal(lit(q.to_lowercase())),
),
None => lf.filter(col("v").is_not_null()),
};
let df = lf
.unique(None, UniqueKeepStrategy::Any)
.sort(["v"], SortMultipleOptions::new().with_nulls_last(true))
.limit(limit)
.collect()?;
Ok((0..df.height()).map(|i| cell(&df, "v", i)).collect())
}