use std::sync::Arc;
use serde::Serialize;
use crate::aggregator::{AliasAggregator, AliasRunResult, SourceSpec, execute_aggregate};
use crate::alias_storage::AliasRecord;
use crate::error::MiniAppError;
use crate::filter::ListFilter;
use crate::materialize::{FieldSelector, apply_projection};
use crate::registry::TableRegistry;
#[derive(Debug, Serialize)]
pub enum AliasRunValue {
Rows(Vec<crate::store::RowRecord>),
Aggregate(AliasRunResult),
}
pub async fn execute_alias_run(
registry: &TableRegistry,
record: AliasRecord,
params: Option<serde_json::Value>,
table_fallback: Option<&str>,
limit_override: Option<u32>,
offset: Option<u32>,
fields: Option<FieldSelector>,
) -> Result<AliasRunValue, MiniAppError> {
let filter_text = record.filter;
let filter: ListFilter = if record.params_schema.is_some() {
let mut params_value = params.ok_or_else(|| MiniAppError::AliasParamsRequired {
name: record.name.clone(),
})?;
if let serde_json::Value::String(ref s) = params_value {
params_value = serde_json::from_str(s).map_err(|e| {
MiniAppError::AliasTemplateError(format!(
"params arrived as a string but failed JSON parse: {e}"
))
})?;
}
let env = minijinja::Environment::new();
let rendered = env
.render_str(&filter_text, ¶ms_value)
.map_err(|e| MiniAppError::AliasTemplateError(e.to_string()))?;
serde_json::from_str(&rendered)
.map_err(|e| MiniAppError::Schema(format!("rendered filter parse error: {e}")))?
} else {
serde_json::from_str(&filter_text)
.map_err(|e| MiniAppError::Schema(format!("filter parse error: {e}")))?
};
let limit = limit_override.or(record.default_limit);
let fields = match fields {
Some(f) => Some(f),
None => match record.fields.as_deref() {
Some(json) => Some(serde_json::from_str(json).map_err(|e| {
MiniAppError::Schema(format!(
"alias '{}' stored fields parse error: {e}",
record.name
))
})?),
None => None,
},
};
if let Some(agg) = record.aggregator {
return execute_aggregator_path(registry, record.sources, filter, agg, limit).await;
}
execute_rows_path(
registry,
record.sources,
table_fallback,
filter,
limit,
offset,
fields,
)
.await
}
async fn execute_aggregator_path(
registry: &TableRegistry,
sources: SourceSpec,
filter: ListFilter,
agg: AliasAggregator,
_limit: Option<u32>,
) -> Result<AliasRunValue, MiniAppError> {
let resolved = if sources.requires_resolve() {
let table_names: Vec<String> = registry.table_names().map(str::to_owned).collect();
sources.resolve_pattern(&table_names)?
} else {
sources
};
let schema_table =
resolved.tables().first().cloned().ok_or_else(|| {
MiniAppError::Aggregator("alias sources resolved to zero tables".into())
})?;
let schema = Arc::clone(®istry.resolve(Some(schema_table.as_str()))?.schema);
filter.validate(&schema)?;
let result = execute_aggregate(registry, resolved, Some(filter), agg, &schema).await?;
Ok(AliasRunValue::Aggregate(result))
}
async fn execute_rows_path(
registry: &TableRegistry,
sources: SourceSpec,
table_fallback: Option<&str>,
filter: ListFilter,
limit: Option<u32>,
offset: Option<u32>,
fields: Option<FieldSelector>,
) -> Result<AliasRunValue, MiniAppError> {
let table_name: Option<&str> = match &sources {
SourceSpec::Single(t) if !t.is_empty() => Some(t.as_str()),
SourceSpec::Single(_) => None,
SourceSpec::Multi(_) | SourceSpec::Pattern(_) => {
return Err(MiniAppError::Aggregator(
"Multi/Pattern source aliases require an aggregator (Phase 2 limitation)".into(),
));
}
};
let effective_table = table_name.or(table_fallback);
let entry = registry.resolve(effective_table)?;
let store = Arc::clone(&entry.store);
let schema = Arc::clone(&entry.schema);
filter.validate(&schema)?;
let records = store.list(limit, offset, Some(filter)).await?;
let records = apply_projection(records, &fields, &schema)?;
Ok(AliasRunValue::Rows(records))
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use tempfile::tempdir;
use super::*;
use crate::aggregator::{AliasAggregator, SourceSpec};
use crate::alias_storage::AliasRecord;
use crate::registry::{TableEntry, TableRegistry};
use crate::schema::{FieldDef, FieldType, SchemaConfig};
use crate::store::Store;
fn status_schema() -> SchemaConfig {
SchemaConfig {
table: "items".into(),
title: None,
description: None,
fields: vec![FieldDef {
name: "status".into(),
ty: FieldType::String,
required: false,
description: None,
}],
dump: None,
}
}
async fn make_store_with_rows(schema: &SchemaConfig, rows: Vec<serde_json::Value>) -> Store {
let dir = tempdir().expect("tempdir");
let db_path = dir.path().join("test.db");
let store = Store::open(&db_path, schema.clone())
.await
.expect("store open");
std::mem::forget(dir);
for row in rows {
store.create(row).await.expect("insert row");
}
store
}
fn registry_from_store(table: &str, store: Store, schema: SchemaConfig) -> TableRegistry {
let mut entries = HashMap::new();
entries.insert(
table.to_string(),
TableEntry {
store: Arc::new(store),
schema: Arc::new(schema),
schema_path: Arc::new(std::path::PathBuf::new()),
},
);
TableRegistry::from_entries(entries, Some(table.to_string()))
}
fn plain_alias(sources: SourceSpec, filter_json: &str) -> AliasRecord {
AliasRecord {
name: "test_alias".into(),
sources,
aggregator: None,
filter: filter_json.into(),
default_limit: None,
description: None,
params_schema: None,
fields: None,
scope: None,
}
}
#[tokio::test]
async fn rows_path_single_source() {
let schema = status_schema();
let store = make_store_with_rows(
&schema,
vec![
serde_json::json!({"status": "open"}),
serde_json::json!({"status": "closed"}),
],
)
.await;
let registry = registry_from_store("items", store, schema);
let filter_json = r#"{"type":"eq","field":"status","value":"open"}"#;
let record = plain_alias(SourceSpec::Single("items".into()), filter_json);
let result = execute_alias_run(®istry, record, None, None, None, None, None)
.await
.expect("execute_alias_run");
match result {
AliasRunValue::Rows(rows) => {
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].data["status"], "open");
}
other => panic!("expected Rows, got {other:?}"),
}
}
#[tokio::test]
async fn aggregator_path_count() {
let schema = status_schema();
let store = make_store_with_rows(
&schema,
vec![
serde_json::json!({"status": "open"}),
serde_json::json!({"status": "open"}),
serde_json::json!({"status": "closed"}),
],
)
.await;
let registry = registry_from_store("items", store, schema);
let record = AliasRecord {
name: "count_alias".into(),
sources: SourceSpec::Single("items".into()),
aggregator: Some(AliasAggregator::Count),
filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
default_limit: None,
description: None,
params_schema: None,
fields: None,
scope: None,
};
let result = execute_alias_run(®istry, record, None, None, None, None, None)
.await
.expect("execute_alias_run");
match result {
AliasRunValue::Aggregate(AliasRunResult::Count(n)) => assert_eq!(n, 2),
other => panic!("expected Aggregate(Count(2)), got {other:?}"),
}
}
#[tokio::test]
async fn jinja_render_substitution() {
let schema = status_schema();
let store = make_store_with_rows(
&schema,
vec![
serde_json::json!({"status": "open"}),
serde_json::json!({"status": "closed"}),
],
)
.await;
let registry = registry_from_store("items", store, schema);
let record = AliasRecord {
name: "templated_alias".into(),
sources: SourceSpec::Single("items".into()),
aggregator: None,
filter: r#"{"type":"eq","field":"status","value":"{{ status }}"}"#.into(),
default_limit: None,
description: None,
params_schema: Some(r#"["status"]"#.into()),
fields: None,
scope: None,
};
let params = serde_json::json!({"status": "closed"});
let result = execute_alias_run(®istry, record, Some(params), None, None, None, None)
.await
.expect("execute_alias_run");
match result {
AliasRunValue::Rows(rows) => {
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].data["status"], "closed");
}
other => panic!("expected Rows, got {other:?}"),
}
}
#[tokio::test]
async fn jinja_render_with_stringified_params() {
let schema = status_schema();
let store = make_store_with_rows(
&schema,
vec![
serde_json::json!({"status": "open"}),
serde_json::json!({"status": "closed"}),
],
)
.await;
let registry = registry_from_store("items", store, schema);
let record = AliasRecord {
name: "templated_alias".into(),
sources: SourceSpec::Single("items".into()),
aggregator: None,
filter: r#"{"type":"eq","field":"status","value":"{{ status }}"}"#.into(),
default_limit: None,
description: None,
params_schema: Some(r#"["status"]"#.into()),
fields: None,
scope: None,
};
let stringified_params = serde_json::Value::String(r#"{"status": "closed"}"#.to_string());
let result = execute_alias_run(
®istry,
record,
Some(stringified_params),
None,
None,
None,
None,
)
.await
.expect("execute_alias_run must re-parse stringified params");
match result {
AliasRunValue::Rows(rows) => {
assert_eq!(rows.len(), 1);
assert_eq!(rows[0].data["status"], "closed");
}
other => panic!("expected Rows, got {other:?}"),
}
}
#[tokio::test]
async fn legacy_mode_table_fallback() {
let schema = status_schema();
let store =
make_store_with_rows(&schema, vec![serde_json::json!({"status": "open"})]).await;
let registry = registry_from_store("items", store, schema);
let record = AliasRecord {
name: "legacy_alias".into(),
sources: SourceSpec::Single(String::new()),
aggregator: None,
filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
default_limit: None,
description: None,
params_schema: None,
fields: None,
scope: None,
};
let result = execute_alias_run(®istry, record, None, Some("items"), None, None, None)
.await
.expect("execute_alias_run");
match result {
AliasRunValue::Rows(rows) => assert!(!rows.is_empty()),
other => panic!("expected Rows, got {other:?}"),
}
}
#[tokio::test]
async fn multi_without_aggregator_is_error() {
let schema = status_schema();
let store = make_store_with_rows(&schema, vec![]).await;
let registry = registry_from_store("items", store, schema);
let record = AliasRecord {
name: "multi_alias".into(),
sources: SourceSpec::Multi(vec!["items".into(), "other".into()]),
aggregator: None,
filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
default_limit: None,
description: None,
params_schema: None,
fields: None,
scope: None,
};
let err = execute_alias_run(®istry, record, None, None, None, None, None)
.await
.expect_err("should fail");
let msg = err.to_string();
assert!(
msg.contains("Multi/Pattern source aliases require an aggregator"),
"unexpected error: {msg}"
);
}
}