use indexmap::IndexMap;
use serde::{Deserialize, Serialize};
use serde_json::Value as Json;
pub const ADDITIVE: &[&str] = &["sum", "count", "min", "max"];
pub const SUFFICIENT_STAT: &[&str] = &["mean", "weighted_mean"];
pub const NON_DECOMPOSABLE: &[&str] = &["count_distinct", "median"];
fn default_sum() -> String {
"sum".into()
}
fn default_number() -> String {
"number".into()
}
fn default_additive() -> String {
"additive".into()
}
fn default_last() -> String {
"last".into()
}
fn default_drop() -> String {
"drop".into()
}
fn default_categorical() -> String {
"categorical".into()
}
fn default_title() -> String {
"taxa".into()
}
fn default_entity() -> String {
"entity".into()
}
fn default_entities() -> String {
"entities".into()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Pick {
pub by: String,
#[serde(default = "default_last")]
pub take: String, }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Metric {
pub id: String,
#[serde(default = "default_additive")]
pub kind: String,
#[serde(default = "default_sum")]
pub agg: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub time_agg: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub column: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub formula: Option<Json>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub weight_column: Option<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub grain: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pick: Option<Pick>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rollup: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub numerator: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub denominator: Option<String>,
#[serde(default = "default_number")]
pub unit: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub label: Option<String>,
#[serde(default = "default_drop")]
pub null_policy: String, }
impl Metric {
pub fn resolved_time_agg(&self) -> &str {
if let Some(ta) = &self.time_agg {
return ta;
}
if self.is_entity() {
"last"
} else {
&self.agg
}
}
pub fn is_entity(&self) -> bool {
self.kind == "entity"
}
pub fn is_ratio(&self) -> bool {
self.kind == "ratio"
}
pub fn cross_agg(&self) -> &str {
if self.is_entity() {
self.rollup.as_deref().unwrap_or("sum")
} else {
&self.agg
}
}
pub fn validate(&self) -> Result<(), String> {
const KINDS: &[&str] = &["additive", "entity", "ratio"];
if !KINDS.contains(&self.kind.as_str()) {
return Err(format!(
"unknown metric kind {:?} (additive|entity|ratio)",
self.kind
));
}
const AGGS: &[&str] = &[
"sum",
"count",
"min",
"max",
"mean",
"weighted_mean",
"count_distinct",
"median",
];
if !AGGS.contains(&self.cross_agg()) {
return Err(format!(
"unknown {} {:?}",
if self.is_entity() { "rollup" } else { "agg" },
self.cross_agg()
));
}
if self.is_entity() && self.grain.is_empty() {
return Err(format!(
"entity metric {:?} requires a non-empty `grain`",
self.id
));
}
if self.cross_agg() == "weighted_mean" && self.weight_column.is_none() {
return Err("weighted_mean requires weight_column".into());
}
if self.is_ratio() && (self.numerator.is_none() || self.denominator.is_none()) {
return Err(format!(
"ratio metric {:?} requires `numerator` and `denominator`",
self.id
));
}
const TIME_AGGS: &[&str] = &["last", "first", "mean", "median", "sum", "min", "max"];
if let Some(ta) = &self.time_agg {
if !TIME_AGGS.contains(&ta.as_str()) {
return Err(format!("unknown time_agg {ta:?}"));
}
}
Ok(())
}
}
fn default_sep() -> String {
"/".into()
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PathSpec {
pub column: String,
#[serde(default = "default_sep")]
pub sep: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Axis {
pub id: String,
#[serde(default)]
pub levels: Vec<String>,
#[serde(default)]
pub level_labels: Vec<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub label: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub path: Option<PathSpec>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub row_filter: Option<Json>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub default_size_by: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub size_by: Option<Vec<String>>,
}
impl Axis {
pub fn level_label(&self, i: usize) -> String {
if let Some(l) = self.level_labels.get(i) {
if !l.is_empty() {
return l.clone();
}
}
match self.levels.get(i) {
Some(c) => {
let mut s = c.replace('_', " ");
if let Some(first) = s.get_mut(0..1) {
first.make_ascii_uppercase();
}
s
}
None => String::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Filter {
pub id: String,
#[serde(default)]
pub column: String,
#[serde(default = "default_categorical")]
pub r#type: String, #[serde(default, skip_serializing_if = "Option::is_none")]
pub label: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tags_frame: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub entity_column: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tag_column: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub control: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub default: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SourceSpec {
Sql { dsn: String, query: String },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Frame {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub source: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub transform: Option<String>,
pub id_column: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub label_column: Option<String>,
#[serde(default)]
pub metrics: Vec<Metric>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timestamp: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resolutions: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub default_resolution: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct View {
pub frame: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub dims_from: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub branch_set: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub series_frame: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Dataset {
pub axes: Vec<Axis>,
#[serde(default)]
pub filters: Vec<Filter>,
#[serde(default = "default_title")]
pub title: String,
#[serde(default = "default_entity")]
pub entity_noun: String,
#[serde(default = "default_entities")]
pub entity_noun_plural: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub default_axis: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub default_size_by: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub sources: Option<IndexMap<String, SourceSpec>>,
pub frames: IndexMap<String, Frame>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub views: Option<IndexMap<String, View>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub detail_fields: Option<Vec<String>>,
#[serde(default = "default_lookahead")]
pub lookahead: Option<i64>,
#[serde(default = "default_branch_cap")]
pub branch_cap: i64,
#[serde(default = "default_leaf_cap")]
pub leaf_cap: i64,
#[serde(default = "default_levels")]
pub default_levels: i64,
}
fn default_lookahead() -> Option<i64> {
Some(2)
}
fn default_branch_cap() -> i64 {
12
}
fn default_leaf_cap() -> i64 {
50
}
fn default_levels() -> i64 {
2
}
pub const VIEW_NAMES: &[&str] = &["treemap", "scatter", "detail", "series"];
impl Dataset {
pub fn axis(&self, id: &str) -> Option<&Axis> {
self.axes.iter().find(|a| a.id == id)
}
pub fn frame(&self, name: &str) -> Option<&Frame> {
self.frames.get(name)
}
fn sole_frame(&self) -> Option<&String> {
if self.frames.len() == 1 {
self.frames.keys().next()
} else {
None
}
}
pub fn resolved_views(&self) -> Result<IndexMap<String, View>, String> {
if let Some(v) = &self.views {
return Ok(v.clone());
}
let sole = self.sole_frame().ok_or_else(|| {
"manifest has multiple frames but no `views` — bindings are ambiguous; \
declare a `views` map"
.to_string()
})?;
let mk = |frame: &str| View {
frame: frame.to_string(),
dims_from: None,
branch_set: None,
series_frame: None,
};
let mut views: IndexMap<String, View> = IndexMap::new();
views.insert("treemap".into(), mk(sole));
views.insert("scatter".into(), mk(sole));
let has_ts = self
.frame(sole)
.and_then(|f| f.timestamp.as_ref())
.is_some();
let mut detail = mk(sole);
if has_ts {
detail.series_frame = Some(sole.clone());
}
views.insert("detail".into(), detail);
if has_ts {
views.insert("series".into(), mk(sole));
}
Ok(views)
}
pub fn view(&self, name: &str) -> Option<View> {
self.resolved_views()
.ok()
.and_then(|v| v.get(name).cloned())
}
pub fn frame_dataset(&self, frame_name: &str) -> crate::error::Result<FrameDataset> {
let f = self.frame(frame_name).ok_or_else(|| {
crate::error::Error::Schema(format!("frame {frame_name:?} not in `frames`"))
})?;
Ok(FrameDataset {
source: f.source.clone().unwrap_or_default(),
transform: f.transform.clone(),
tag_indices: std::collections::HashMap::new(), id_column: f.id_column.clone(),
label_column: f
.label_column
.clone()
.unwrap_or_else(|| f.id_column.clone()),
metrics: f.metrics.clone(),
timestamp_column: f.timestamp.clone(),
series_resolutions: f.resolutions.clone(),
series_default_resolution: f.default_resolution.clone(),
series_source: None,
series_metrics: None,
axes: self.axes.clone(),
filters: self.filters.clone(),
title: self.title.clone(),
entity_noun: self.entity_noun.clone(),
entity_noun_plural: self.entity_noun_plural.clone(),
default_axis: self.default_axis.clone(),
default_size_by: self.default_size_by.clone(),
detail_fields: self.detail_fields.clone(),
lookahead: self.lookahead,
branch_cap: self.branch_cap,
leaf_cap: self.leaf_cap,
default_levels: self.default_levels,
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FrameDataset {
#[serde(default)]
pub source: String,
#[serde(default)]
pub id_column: String,
#[serde(default)]
pub label_column: String,
#[serde(skip)]
pub tag_indices: std::collections::HashMap<String, crate::tags::TagIndex>,
pub axes: Vec<Axis>,
#[serde(default)]
pub metrics: Vec<Metric>,
#[serde(default)]
pub filters: Vec<Filter>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub timestamp_column: Option<String>,
#[serde(default = "default_title")]
pub title: String,
#[serde(default = "default_entity")]
pub entity_noun: String,
#[serde(default = "default_entities")]
pub entity_noun_plural: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub default_axis: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub default_size_by: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub transform: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub series_source: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub series_metrics: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub series_resolutions: Option<Vec<String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub series_default_resolution: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub detail_fields: Option<Vec<String>>,
#[serde(default = "default_lookahead")]
pub lookahead: Option<i64>,
#[serde(default = "default_branch_cap")]
pub branch_cap: i64,
#[serde(default = "default_leaf_cap")]
pub leaf_cap: i64,
#[serde(default = "default_levels")]
pub default_levels: i64,
}
impl FrameDataset {
pub fn metric(&self, id: &str) -> Option<&Metric> {
self.metrics.iter().find(|m| m.id == id)
}
pub fn axis(&self, id: &str) -> Option<&Axis> {
self.axes.iter().find(|a| a.id == id)
}
pub fn resolve_size_by(&self, size_by: Option<&str>) -> Option<String> {
size_by
.map(str::to_string)
.or_else(|| self.default_size_by.clone())
.or_else(|| self.metrics.first().map(|m| m.id.clone()))
}
pub fn validate(&self) -> Result<(), String> {
for m in &self.metrics {
m.validate()?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn single_frame_manifest_round_trips() {
let json = r#"{
"axes": [{"id": "owner_repo", "levels": ["owner", "repo"]}],
"frames": {"main": {
"source": "repos", "id_column": "repo", "label_column": "repo",
"metrics": [
{"id": "stars", "agg": "sum", "column": "stars", "unit": "count"},
{"id": "repos", "agg": "count", "unit": "count"}
]
}},
"default_axis": "owner_repo", "default_size_by": "stars"
}"#;
let ds: Dataset = serde_json::from_str(json).unwrap();
assert_eq!(ds.axis("owner_repo").unwrap().levels, ["owner", "repo"]);
assert_eq!(ds.title, "taxa");
let views = ds.resolved_views().unwrap();
assert_eq!(
views.keys().collect::<Vec<_>>(),
["treemap", "scatter", "detail"]
);
let fd = ds.frame_dataset("main").unwrap();
assert_eq!(fd.source, "repos");
assert_eq!(fd.id_column, "repo");
assert_eq!(fd.metric("repos").unwrap().agg, "count");
assert_eq!(fd.resolve_size_by(None).as_deref(), Some("stars"));
let err = ds.frame_dataset("does-not-exist");
assert!(err.is_err(), "unknown frame must error, not panic");
assert!(format!("{}", err.unwrap_err()).contains("does-not-exist"));
fd.validate().unwrap();
assert_eq!(fd.metric("stars").unwrap().null_policy, "drop");
}
#[test]
fn single_frame_with_timestamp_gets_default_series_view() {
let json = r#"{
"axes": [{"id": "g", "levels": ["g", "id"]}],
"frames": {"main": {
"source": "tvl", "id_column": "id", "timestamp": "dt",
"metrics": [{"id": "tvl", "agg": "sum", "column": "tvl"}],
"resolutions": ["w"], "default_resolution": "w"
}}
}"#;
let ds: Dataset = serde_json::from_str(json).unwrap();
let views = ds.resolved_views().unwrap();
assert_eq!(
views.keys().collect::<Vec<_>>(),
["treemap", "scatter", "detail", "series"]
);
assert_eq!(views["detail"].series_frame.as_deref(), Some("main"));
assert_eq!(views["series"].frame, "main");
let fd = ds.frame_dataset("main").unwrap();
assert_eq!(fd.timestamp_column.as_deref(), Some("dt"));
assert_eq!(
fd.series_resolutions.as_deref(),
Some(["w".to_string()].as_slice())
);
assert_eq!(fd.series_default_resolution.as_deref(), Some("w"));
}
#[test]
fn validate_accepts_engine_time_aggs() {
let mk = |ta: &str| Metric {
id: "m".into(),
kind: "additive".into(),
agg: "sum".into(),
time_agg: Some(ta.into()),
column: Some("c".into()),
formula: None,
weight_column: None,
grain: vec![],
pick: None,
rollup: None,
numerator: None,
denominator: None,
unit: "number".into(),
label: None,
null_policy: "drop".into(),
};
for ta in ["last", "first", "mean", "median", "sum", "min", "max"] {
assert!(mk(ta).validate().is_ok(), "time_agg {ta:?} must validate");
}
assert!(mk("bogus").validate().is_err());
}
#[test]
fn multi_frame_without_views_is_ambiguous() {
let json = r#"{
"axes": [{"id": "a", "levels": ["a", "id"]}],
"frames": {
"snapshot": {"source": "snap", "id_column": "id",
"metrics": [{"id": "m", "agg": "sum", "column": "m"}]},
"series": {"source": "facts", "id_column": "id", "timestamp": "dt",
"metrics": [{"id": "m", "agg": "sum", "column": "m"}]}
}
}"#;
let ds: Dataset = serde_json::from_str(json).unwrap();
assert!(ds.resolved_views().is_err());
}
#[test]
fn frames_views_round_trip_and_frame_dataset_resolves() {
let json = r#"{
"title": "Companies", "entity_noun": "company",
"axes": [{"id": "sector", "levels": ["sector", "symbol"]}],
"filters": [{"id": "sector", "column": "sector", "type": "categorical"}],
"sources": {
"snap": {"sql": {"dsn": "host=/tmp dbname=x", "query": "SELECT * FROM snap"}},
"facts": {"sql": {"dsn": "host=/tmp dbname=x", "query": "SELECT * FROM facts"}}
},
"frames": {
"snapshot": {"source": "snap", "id_column": "symbol", "label_column": "name",
"metrics": [{"id": "mcap", "agg": "sum", "column": "mcap", "unit": "money"}]},
"series": {"source": "facts", "id_column": "symbol", "timestamp": "date",
"metrics": [{"id": "mcap_usd", "agg": "sum", "column": "mcap_usd"}],
"resolutions": ["w"], "default_resolution": "w"}
},
"views": {
"treemap": {"frame": "snapshot"},
"scatter": {"frame": "snapshot"},
"detail": {"frame": "snapshot", "series_frame": "series"},
"series": {"frame": "series", "dims_from": "snapshot", "branch_set": "treemap"}
}
}"#;
let ds: Dataset = serde_json::from_str(json).unwrap();
assert_eq!(ds.frames.keys().collect::<Vec<_>>(), ["snapshot", "series"]);
let views = ds.views.as_ref().expect("views present");
assert_eq!(
views.keys().collect::<Vec<_>>(),
["treemap", "scatter", "detail", "series"]
);
assert_eq!(
ds.view("series").unwrap().dims_from.as_deref(),
Some("snapshot")
);
assert_eq!(
ds.view("series").unwrap().branch_set.as_deref(),
Some("treemap")
);
assert_eq!(
ds.view("detail").unwrap().series_frame.as_deref(),
Some("series")
);
let snap = ds.frame_dataset("snapshot").unwrap();
assert_eq!(snap.source, "snap");
assert_eq!(snap.id_column, "symbol");
assert_eq!(snap.label_column, "name");
assert_eq!(snap.metrics.len(), 1);
assert_eq!(snap.metrics[0].id, "mcap");
assert!(snap.timestamp_column.is_none());
assert_eq!(snap.axes.len(), 1);
assert_eq!(snap.axes[0].id, "sector");
assert_eq!(snap.filters.len(), 1);
assert_eq!(snap.title, "Companies");
assert_eq!(snap.entity_noun, "company");
let ser = ds.frame_dataset("series").unwrap();
assert_eq!(ser.source, "facts");
assert_eq!(ser.timestamp_column.as_deref(), Some("date"));
assert_eq!(ser.metrics[0].id, "mcap_usd");
assert_eq!(
ser.series_resolutions.as_deref(),
Some(["w".to_string()].as_slice())
);
assert_eq!(ser.series_default_resolution.as_deref(), Some("w"));
assert_eq!(ser.axes[0].id, "sector");
let text = serde_json::to_string(&ds).unwrap();
let ds2: Dataset = serde_json::from_str(&text).unwrap();
assert_eq!(
ds2.frames.keys().collect::<Vec<_>>(),
["snapshot", "series"]
);
assert_eq!(
ds2.view("series").unwrap().dims_from.as_deref(),
Some("snapshot")
);
}
#[test]
fn manifest_with_sources_and_transform_round_trips() {
let json = r#"{
"axes": [{"id": "owner_repo", "levels": ["owner", "repo"]}],
"sources": {
"repos": {"sql": {"dsn": "host=/tmp dbname=investing", "query": "SELECT * FROM repos"}},
"owners": {"sql": {"dsn": "host=/tmp dbname=investing", "query": "SELECT * FROM owners"}}
},
"frames": {"main": {
"source": "repos", "transform": "plan.bin", "id_column": "repo", "label_column": "repo",
"metrics": [{"id": "stars", "agg": "sum", "column": "stars"}]
}}
}"#;
let ds: Dataset = serde_json::from_str(json).unwrap();
let sources = ds.sources.as_ref().expect("sources present");
assert_eq!(sources.keys().collect::<Vec<_>>(), ["repos", "owners"]);
let SourceSpec::Sql { dsn, query } = &sources["repos"];
assert_eq!(dsn, "host=/tmp dbname=investing");
assert_eq!(query, "SELECT * FROM repos");
assert_eq!(
ds.frame("main").unwrap().transform.as_deref(),
Some("plan.bin")
);
let text = serde_json::to_string(&ds).unwrap();
let repos_at = text.find("\"repos\"").unwrap();
let owners_at = text.find("\"owners\"").unwrap();
assert!(repos_at < owners_at, "authored source order not preserved");
let ds2: Dataset = serde_json::from_str(&text).unwrap();
let s2 = ds2.sources.as_ref().unwrap();
assert_eq!(s2.keys().collect::<Vec<_>>(), ["repos", "owners"]);
}
}