use std::collections::{BTreeSet, HashMap};
use polars::prelude::*;
use serde_json::{json, Map, Value as Json};
use crate::error::{Error, Result};
use crate::filters::{axis_row_filter, filter_exprs, focus_exprs};
use crate::manifest::FrameDataset;
use crate::metric::{metric_plan, value_expr};
use crate::output::av_to_label;
use crate::source::Source;
pub const MAX_TOP_K: usize = 50;
fn resolution_every(resolution: &str) -> &'static str {
match resolution {
"w" => "1w",
"m" => "1mo",
"q" => "1q",
"y" => "1y",
_ => "1d",
}
}
pub struct SeriesArgs {
pub axis: String,
pub metric: String,
pub focus: Vec<Json>,
pub filters: Map<String, Json>,
pub agg: Option<String>,
pub resolution: String, pub window_days: Option<i64>,
pub top_k: usize,
pub other_label: String,
pub size_by: Option<String>,
pub branches: Option<Vec<String>>,
pub include_other: bool,
}
impl SeriesArgs {
pub fn new(axis: impl Into<String>, metric: impl Into<String>) -> Self {
SeriesArgs {
axis: axis.into(),
metric: metric.into(),
focus: vec![],
filters: Map::new(),
agg: None,
resolution: "d".into(),
window_days: None,
top_k: 12,
other_label: "Other".into(),
size_by: None,
branches: None,
include_other: true,
}
}
}
fn mean_expr(e: Expr) -> Expr {
e.clone().sum() / e.count()
}
pub fn series(ds: &FrameDataset, source: &dyn Source, args: &SeriesArgs) -> Result<Json> {
let columns = source.columns()?;
let axis = ds
.axis(&args.axis)
.ok_or_else(|| Error::Schema(format!("unknown axis {:?}", args.axis)))?;
let m = ds
.metric(&args.metric)
.ok_or_else(|| Error::Schema(format!("unknown metric {:?}", args.metric)))?;
let ts = ds
.timestamp_column
.as_ref()
.ok_or_else(|| Error::Schema("dataset has no timestamp_column".into()))?;
let agg = args
.agg
.clone()
.unwrap_or_else(|| m.cross_agg().to_string());
let every = resolution_every(&args.resolution);
let is_count = agg == "count";
let time_fold_expr = if is_count {
None
} else {
let v = value_expr(m, &columns)?;
let ta = m.time_agg.as_deref().unwrap_or(&m.agg);
Some(
match ta {
"last" => v
.sort_by([col(ts.as_str())], SortMultipleOptions::default())
.last(),
"first" => v
.sort_by([col(ts.as_str())], SortMultipleOptions::default())
.first(),
"mean" => mean_expr(v),
"median" => v.median(),
"min" => v.min(),
"max" => v.max(),
_ => v.sum(),
}
.alias("v"),
)
};
let cross_expr = if is_count {
col(ds.id_column.as_str()).n_unique().alias("v")
} else {
let c = col("v");
match agg.as_str() {
"mean" => mean_expr(c),
"median" => c.median(),
"max" => c.max(),
"min" => c.min(),
_ => c.sum(),
}
.alias("v")
};
let mut lf = source.frame()?;
for e in filter_exprs(ds, &args.filters) {
lf = lf.filter(e);
}
if let Some(pred) = axis_row_filter(axis, &columns)? {
lf = lf.filter(pred);
}
let masks = crate::metric::entity_mask_exprs(&ds.metrics, &ds.id_column, &columns)?;
let lf = if masks.is_empty() {
lf
} else {
lf.with_columns(masks)
};
let f = args.focus.len();
let (levels, lf) = crate::path::resolved_levels(axis, lf)?;
let levels = &levels;
let branch_col = levels
.get(f)
.or_else(|| levels.last())
.cloned()
.ok_or_else(|| Error::Schema("axis has no levels".into()))?;
let mut lf = lf;
for e in focus_exprs(levels, &args.focus) {
lf = lf.filter(e);
}
if let Some(days) = args.window_days {
let max_df = lf
.clone()
.select([col(ts.as_str())
.cast(DataType::Date)
.max()
.alias("__max_ts")])
.collect()?;
let as_of: Option<i32> = max_df
.column("__max_ts")
.ok()
.and_then(|c| c.get(0).ok())
.and_then(|av| match av {
AnyValue::Date(d) => Some(d),
_ => None,
});
if let Some(as_of) = as_of {
let cutoff = as_of - days as i32;
lf = lf.filter(
col(ts.as_str())
.cast(DataType::Date)
.gt_eq(lit(cutoff).cast(DataType::Date)),
);
}
}
let bucket = col(ts.as_str())
.cast(DataType::Date)
.dt()
.truncate(lit(every))
.cast(DataType::Date)
.alias("t");
let bucketed = lf.with_column(bucket);
let staged = match &time_fold_expr {
Some(fold) => bucketed
.clone()
.group_by([
col(ds.id_column.as_str()),
col(branch_col.as_str()),
col("t"),
])
.agg([fold.clone()]),
None => bucketed.clone(),
};
let grouped = staged
.clone()
.group_by([col(branch_col.as_str()).alias("branch"), col("t")])
.agg([cross_expr.clone()])
.collect()?;
let branch_s = grouped.column("branch")?;
let t_s = grouped.column("t")?;
let v_s = grouped.column("v")?;
let h = grouped.height();
let mut by_branch: HashMap<String, HashMap<String, f64>> = HashMap::new();
let mut all_t: BTreeSet<String> = BTreeSet::new();
for i in 0..h {
let b = av_to_label(&branch_s.get(i)?.into_static());
let t = av_to_label(&t_s.get(i)?.into_static());
let v = crate::output::av_to_f64(&v_s.get(i)?.into_static());
if branch_s.get(i)?.is_null() || t_s.get(i)?.is_null() {
continue;
}
if let Some(v) = v {
by_branch.entry(b).or_default().insert(t.clone(), v);
}
all_t.insert(t);
}
let grid: Vec<String> = all_t.into_iter().collect();
let ffill = |d: &HashMap<String, f64>| -> Vec<Option<f64>> {
let mut out = Vec::with_capacity(grid.len());
let mut last: Option<f64> = None;
for t in &grid {
if let Some(v) = d.get(t) {
last = Some(*v);
}
out.push(last);
}
out
};
let filled: HashMap<String, Vec<Option<f64>>> = by_branch
.iter()
.map(|(b, d)| (b.clone(), ffill(d)))
.collect();
let (top, rest): (Vec<&String>, Vec<&String>) = if let Some(keep) = &args.branches {
let keep_set: std::collections::HashSet<&str> = keep
.iter()
.map(|s| s.as_str())
.filter(|s| *s != "__other__")
.collect();
let present: std::collections::HashSet<&String> = filled.keys().collect();
let top: Vec<&String> = keep
.iter()
.filter(|k| k.as_str() != "__other__")
.filter_map(|k| present.get(k).copied())
.collect();
let rest: Vec<&String> = filled
.keys()
.filter(|b| !keep_set.contains(b.as_str()))
.collect();
(top, rest)
} else {
let size_by = ds
.resolve_size_by(args.size_by.as_deref())
.ok_or_else(|| Error::Schema("no size_by / metrics".into()))?;
let size_m = ds
.metric(&size_by)
.ok_or_else(|| Error::Schema(format!("size_by {size_by:?} not a metric")))?;
let count_rank = size_m.cross_agg() == "count";
let size_plan = metric_plan(size_m, &columns)?;
let rank_stat: Expr = if count_rank {
col(ds.id_column.as_str()).n_unique().alias("_rankval")
} else {
size_plan.rank_expr.clone().alias("_rankval")
};
let max_bucket: Option<i32> = bucketed
.clone()
.select([col("t").cast(DataType::Date).max().alias("__max_b")])
.collect()?
.column("__max_b")
.ok()
.and_then(|c| c.get(0).ok())
.and_then(|av| match av {
AnyValue::Date(d) => Some(d),
_ => None,
});
let rank_frame = match max_bucket {
Some(b) => bucketed.clone().filter(
col("t")
.cast(DataType::Date)
.eq(lit(b).cast(DataType::Date)),
),
None => bucketed.clone(),
};
let size_rank = if count_rank {
rank_frame
.group_by([col(branch_col.as_str()).alias("branch")])
.agg([rank_stat.clone()])
.select([col("branch"), col("_rankval")])
.collect()?
} else {
rank_frame
.group_by([col(branch_col.as_str()).alias("branch")])
.agg(
size_plan
.stats
.iter()
.map(|(_, e)| e.clone())
.collect::<Vec<_>>(),
)
.with_column(rank_stat.clone())
.select([col("branch"), col("_rankval")])
.collect()?
};
let mut rankval: HashMap<String, f64> = HashMap::new();
{
let (bs, rs) = (size_rank.column("branch")?, size_rank.column("_rankval")?);
for i in 0..size_rank.height() {
if bs.get(i)?.is_null() {
continue;
}
let b = av_to_label(&bs.get(i)?.into_static());
let r = crate::output::av_to_f64(&rs.get(i)?.into_static())
.unwrap_or(f64::NEG_INFINITY);
rankval.insert(b, r);
}
}
let mut ordered: Vec<&String> = filled.keys().collect();
ordered.sort_by(|a, b| {
let ra = rankval.get(*a).copied().unwrap_or(f64::NEG_INFINITY);
let rb = rankval.get(*b).copied().unwrap_or(f64::NEG_INFINITY);
rb.partial_cmp(&ra)
.unwrap_or(std::cmp::Ordering::Equal)
.then_with(|| a.cmp(b))
});
let top_k = args.top_k.min(MAX_TOP_K);
let top: Vec<&String> = ordered.iter().take(top_k).copied().collect();
let rest: Vec<&String> = ordered.iter().skip(top_k).copied().collect();
(top, rest)
};
let to_values = |vs: &[Option<f64>]| -> Vec<Json> {
vs.iter()
.map(|v| {
v.and_then(|x| serde_json::Number::from_f64(x).map(Json::Number))
.unwrap_or(Json::Null)
})
.collect()
};
let mut out_series: Vec<Json> = top
.iter()
.map(|b| {
json!({"key": b, "label": b, "dates": grid, "values": to_values(&filled[*b]), "is_other": false})
})
.collect();
let mut branch_keys: Vec<Json> = top.iter().map(|b| json!(b)).collect();
if !rest.is_empty() && args.include_other {
let rest_labels: Vec<String> = rest.iter().map(|s| s.to_string()).collect();
let mask = col(branch_col.as_str())
.cast(DataType::String)
.is_in(lit(Series::new("".into(), rest_labels)).implode(), false);
let other_df = staged
.clone()
.filter(mask)
.group_by([col("t")])
.agg([cross_expr])
.collect()?;
let mut other_map: HashMap<String, f64> = HashMap::new();
let (ot, ov) = (other_df.column("t")?, other_df.column("v")?);
for i in 0..other_df.height() {
if let (false, Some(v)) = (
ot.get(i)?.is_null(),
crate::output::av_to_f64(&ov.get(i)?.into_static()),
) {
other_map.insert(av_to_label(&ot.get(i)?.into_static()), v);
}
}
out_series.push(json!({
"key": "__other__", "label": format!("{} (n={})", args.other_label, rest.len()),
"dates": grid, "values": to_values(&ffill(&other_map)), "is_other": true,
}));
branch_keys.push(json!("__other__"));
}
let cadence = match args.resolution.as_str() {
"w" => "weekly",
"m" => "monthly",
"q" => "quarterly",
"y" => "annual",
_ => "daily",
};
Ok(json!({
"series": out_series,
"meta": {"cadence": cadence, "agg": agg, "unit": m.unit, "metric_id": args.metric,
"branch_keys": branch_keys, "notice": Json::Null},
}))
}
pub fn entity_series(
ds: &FrameDataset,
source: &dyn Source,
id: &str,
metric: &str,
window_days: Option<i64>,
resolution: &str,
) -> Result<Json> {
let columns = source.columns()?;
let m = ds
.metric(metric)
.ok_or_else(|| Error::Schema(format!("unknown metric {metric:?}")))?;
let ts = ds
.timestamp_column
.as_ref()
.ok_or_else(|| Error::Schema("dataset has no timestamp_column".into()))?;
let every = resolution_every(resolution);
let v = value_expr(m, &columns)?;
let ta = m.time_agg.as_deref().unwrap_or("last");
let agg_expr = match ta {
"last" => v
.sort_by([col(ts.as_str())], SortMultipleOptions::default())
.last(),
"first" => v
.sort_by([col(ts.as_str())], SortMultipleOptions::default())
.first(),
"mean" => mean_expr(v),
"median" => v.median(),
"min" => v.min(),
"max" => v.max(),
_ => v.sum(),
}
.alias("v");
let lf = source
.frame()?
.filter(col(ds.id_column.as_str()).eq(lit(id)));
let bucket = col(ts.as_str())
.cast(DataType::Date)
.dt()
.truncate(lit(every))
.cast(DataType::Date)
.alias("t");
let grouped = lf
.with_column(bucket)
.group_by([col("t")])
.agg([agg_expr])
.sort(["t"], SortMultipleOptions::default())
.collect()?;
let t_s = grouped.column("t")?;
let v_s = grouped.column("v")?;
let mut points: Vec<(String, f64)> = Vec::with_capacity(grouped.height());
for i in 0..grouped.height() {
if t_s.get(i)?.is_null() {
continue;
}
let t = av_to_label(&t_s.get(i)?.into_static());
if let Some(val) = crate::output::av_to_f64(&v_s.get(i)?.into_static()) {
points.push((t, val));
}
}
if let (Some(days), Some((max_t, _))) = (window_days, points.last().cloned()) {
if let Ok(as_of) = chrono_days(&max_t) {
let cutoff = as_of - days;
points.retain(|(t, _)| chrono_days(t).map(|d| d >= cutoff).unwrap_or(true));
}
}
let mut dates: Vec<Json> = Vec::with_capacity(points.len());
let mut values: Vec<Json> = Vec::with_capacity(points.len());
for (t, val) in &points {
dates.push(json!(t));
values.push(
serde_json::Number::from_f64(*val)
.map(Json::Number)
.unwrap_or(Json::Null),
);
}
let label = m.label.clone().unwrap_or_else(|| m.id.clone());
Ok(json!({
"dates": dates,
"values": values,
"unit": m.unit,
"label": label,
"metric_id": metric,
}))
}
fn chrono_days(iso: &str) -> std::result::Result<i64, ()> {
let mut it = iso.split('-');
let y: i32 = it.next().and_then(|s| s.parse().ok()).ok_or(())?;
let mo: u32 = it.next().and_then(|s| s.parse().ok()).ok_or(())?;
let d: u32 = it.next().and_then(|s| s.parse().ok()).ok_or(())?;
let y = if mo <= 2 { y - 1 } else { y };
let era = if y >= 0 { y } else { y - 399 } / 400;
let yoe = (y - era * 400) as i64;
let mp = ((mo as i64 + 9) % 12) as i64;
let doy = (153 * mp + 2) / 5 + d as i64 - 1;
let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy;
Ok(era as i64 * 146097 + doe - 719468)
}