mini_app_core/alias_run.rs
1//! Top-level orchestration for the `alias_run` MCP tool.
2//!
3//! This module exposes [`execute_alias_run`] as the single entry point for
4//! running a named alias. Both the MCP tool handler and direct SDK consumers
5//! call this function; the MCP handler is a thin wrapper that:
6//! 1. Resolves the [`AliasRecord`] from global or per-table storage.
7//! 2. Calls [`execute_alias_run`].
8//! 3. Serialises the [`AliasRunValue`] result to JSON (backward-compat shape).
9//!
10//! # Crux compliance
11//!
12//! - **Crux #1 / #2**: `crates/core/Cargo.toml` must not declare `rmcp` as a
13//! dependency. All types here are pure Core-native types. MCP boundary
14//! conversions (e.g. `MiniAppError → rmcp::ErrorData`) are the sole
15//! responsibility of private adapter functions in `crates/mcp`.
16//!
17//! # MiniJinja render pipeline (Crux §1/#2)
18//!
19//! When `record.params_schema` is `Some`, the `filter` field is a MiniJinja
20//! template. The template is rendered with `params` as the context, then the
21//! rendered string is parsed as a [`ListFilter`] JSON document. When
22//! `params_schema` is `None`, the render step is skipped entirely for backward
23//! compatibility with plain-JSON aliases.
24
25use std::sync::Arc;
26
27use serde::Serialize;
28
29use crate::aggregator::{AliasAggregator, AliasRunResult, SourceSpec, execute_aggregate};
30use crate::alias_storage::AliasRecord;
31use crate::error::MiniAppError;
32use crate::filter::ListFilter;
33use crate::materialize::{FieldSelector, apply_projection};
34use crate::registry::TableRegistry;
35
36// =============================================================================
37// Result type
38// =============================================================================
39
40/// The result of [`execute_alias_run`].
41///
42/// Wraps both the plain Rows path (per-table `store.list` + field projection)
43/// and the Aggregator path (`execute_aggregate`). MCP callers serialise each
44/// variant with its natural JSON shape to preserve backward compatibility:
45///
46/// - `Rows(records)` → `serde_json::to_string(&records)`
47/// - `Aggregate(result)` → `serde_json::to_string(&result)`
48#[derive(Debug, Serialize)]
49pub enum AliasRunValue {
50 /// Plain rows path — result of `store.list` + field projection.
51 Rows(Vec<crate::store::RowRecord>),
52 /// Aggregator path — wraps the existing [`AliasRunResult`].
53 Aggregate(AliasRunResult),
54}
55
56// =============================================================================
57// Public entry point
58// =============================================================================
59
60/// Execute an alias and return the typed result.
61///
62/// This is the canonical alias_run implementation. Both the MCP `alias_run`
63/// tool handler and direct SDK consumers call this function.
64///
65/// # Arguments
66///
67/// - `registry` — Live [`TableRegistry`]; used to resolve store handles and
68/// schema configs from table names, and to collect table names for
69/// [`SourceSpec::Pattern`] resolution.
70/// - `record` — The [`AliasRecord`] that describes the alias (sources,
71/// aggregator, filter template, parameter schema, default limit).
72/// - `params` — Optional JSON value used as the MiniJinja render context.
73/// Required when `record.params_schema` is `Some`; ignored (and silently
74/// accepted) when `None`.
75/// - `table_fallback` — Legacy single-table mode: the `table` argument
76/// supplied to `alias_run`. Used when `record.sources` is
77/// `SourceSpec::Single` with an empty placeholder produced by the legacy
78/// per-table path. Ignored when `record.sources` is already a fully
79/// populated `Single`/`Multi`/`Pattern`.
80/// - `limit_override` — Caller-supplied row limit. Falls back to
81/// `record.default_limit` when `None`.
82/// - `offset` — Number of rows to skip (plain Rows path only).
83/// - `fields` — Field projection selector (plain Rows path only).
84///
85/// # Errors
86///
87/// - [`MiniAppError::AliasParamsRequired`] — `params_schema` is `Some` but
88/// `params` is `None`.
89/// - [`MiniAppError::AliasTemplateError`] — MiniJinja render failure.
90/// - [`MiniAppError::Filter`] — filter parse or validate failure.
91/// - [`MiniAppError::Aggregator`] — aggregator execute failure.
92/// - [`MiniAppError::TableNotFound`] — a referenced table is not in the
93/// registry.
94/// - [`MiniAppError::Storage`] — underlying SQLite error.
95pub async fn execute_alias_run(
96 registry: &TableRegistry,
97 record: AliasRecord,
98 params: Option<serde_json::Value>,
99 table_fallback: Option<&str>,
100 limit_override: Option<u32>,
101 offset: Option<u32>,
102 fields: Option<FieldSelector>,
103) -> Result<AliasRunValue, MiniAppError> {
104 // -----------------------------------------------------------------
105 // Step 1: Render the filter template (Crux #1/#2).
106 // -----------------------------------------------------------------
107 let filter_text = record.filter;
108 let filter: ListFilter = if record.params_schema.is_some() {
109 let mut params_value = params.ok_or_else(|| MiniAppError::AliasParamsRequired {
110 name: record.name.clone(),
111 })?;
112 // Defensive parse: some MCP transports (notably the Claude Code
113 // stdio client) stringify `serde_json::Value` argument fields, so
114 // a `params: {"k": "v"}` payload arrives here as
115 // `Value::String("{\"k\":\"v\"}")` rather than `Value::Object(..)`.
116 // When minijinja receives a String it cannot resolve `{{ k }}`
117 // (the String has no keys), the placeholder evaluates to undefined
118 // and lenient render emits an empty value. Re-parse JSON-encoded
119 // strings into their Object form so render works regardless of
120 // the transport.
121 if let serde_json::Value::String(ref s) = params_value {
122 params_value = serde_json::from_str(s).map_err(|e| {
123 MiniAppError::AliasTemplateError(format!(
124 "params arrived as a string but failed JSON parse: {e}"
125 ))
126 })?;
127 }
128 let env = minijinja::Environment::new();
129 let rendered = env
130 .render_str(&filter_text, ¶ms_value)
131 .map_err(|e| MiniAppError::AliasTemplateError(e.to_string()))?;
132 serde_json::from_str(&rendered)
133 .map_err(|e| MiniAppError::Schema(format!("rendered filter parse error: {e}")))?
134 } else {
135 serde_json::from_str(&filter_text)
136 .map_err(|e| MiniAppError::Schema(format!("filter parse error: {e}")))?
137 };
138
139 let limit = limit_override.or(record.default_limit);
140
141 // -----------------------------------------------------------------
142 // Crux #2: Field-projection fallback.
143 //
144 // When the caller supplies `fields` at run-time, use it as-is.
145 // When the caller omits `fields` (None), fall back to the stored
146 // default from `record.fields`.
147 // When `record.fields` is also None, no projection is applied — all
148 // fields are returned (Crux #3: NULL must never become an empty list).
149 // -----------------------------------------------------------------
150 let fields = match fields {
151 Some(f) => Some(f),
152 None => match record.fields.as_deref() {
153 Some(json) => Some(serde_json::from_str(json).map_err(|e| {
154 MiniAppError::Schema(format!(
155 "alias '{}' stored fields parse error: {e}",
156 record.name
157 ))
158 })?),
159 None => None,
160 },
161 };
162
163 // -----------------------------------------------------------------
164 // Step 2: Aggregator path dispatch.
165 // -----------------------------------------------------------------
166 if let Some(agg) = record.aggregator {
167 return execute_aggregator_path(registry, record.sources, filter, agg, limit).await;
168 }
169
170 // -----------------------------------------------------------------
171 // Step 3: Plain (Rows) path.
172 // -----------------------------------------------------------------
173 execute_rows_path(
174 registry,
175 record.sources,
176 table_fallback,
177 filter,
178 limit,
179 offset,
180 fields,
181 )
182 .await
183}
184
185// =============================================================================
186// Internal helpers
187// =============================================================================
188
189/// Aggregator execution path: resolves Pattern sources, validates, and calls
190/// [`execute_aggregate`].
191async fn execute_aggregator_path(
192 registry: &TableRegistry,
193 sources: SourceSpec,
194 filter: ListFilter,
195 agg: AliasAggregator,
196 _limit: Option<u32>,
197) -> Result<AliasRunValue, MiniAppError> {
198 let resolved = if sources.requires_resolve() {
199 let table_names: Vec<String> = registry.table_names().map(str::to_owned).collect();
200 sources.resolve_pattern(&table_names)?
201 } else {
202 sources
203 };
204
205 let schema_table =
206 resolved.tables().first().cloned().ok_or_else(|| {
207 MiniAppError::Aggregator("alias sources resolved to zero tables".into())
208 })?;
209
210 let schema = Arc::clone(®istry.resolve(Some(schema_table.as_str()))?.schema);
211
212 filter.validate(&schema)?;
213
214 let result = execute_aggregate(registry, resolved, Some(filter), agg, &schema).await?;
215 Ok(AliasRunValue::Aggregate(result))
216}
217
218/// Plain rows execution path: resolves a single-table store and returns rows
219/// after applying optional field projection.
220async fn execute_rows_path(
221 registry: &TableRegistry,
222 sources: SourceSpec,
223 table_fallback: Option<&str>,
224 filter: ListFilter,
225 limit: Option<u32>,
226 offset: Option<u32>,
227 fields: Option<FieldSelector>,
228) -> Result<AliasRunValue, MiniAppError> {
229 let table_name: Option<&str> = match &sources {
230 // Non-empty Single → use it directly.
231 // Empty-string Single is the legacy sentinel produced by the MCP wrapper
232 // when reading from per-table `_aliases`; fall through to `table_fallback`.
233 SourceSpec::Single(t) if !t.is_empty() => Some(t.as_str()),
234 SourceSpec::Single(_) => None,
235 SourceSpec::Multi(_) | SourceSpec::Pattern(_) => {
236 return Err(MiniAppError::Aggregator(
237 "Multi/Pattern source aliases require an aggregator (Phase 2 limitation)".into(),
238 ));
239 }
240 };
241
242 // Use `table_name` from sources when available; otherwise fall back to
243 // the legacy `table_fallback` arg (per-table alias_run path).
244 let effective_table = table_name.or(table_fallback);
245
246 let entry = registry.resolve(effective_table)?;
247 let store = Arc::clone(&entry.store);
248 let schema = Arc::clone(&entry.schema);
249
250 filter.validate(&schema)?;
251 let records = store.list(limit, offset, Some(filter)).await?;
252 let records = apply_projection(records, &fields, &schema)?;
253 Ok(AliasRunValue::Rows(records))
254}
255
256// =============================================================================
257// Tests
258// =============================================================================
259
260#[cfg(test)]
261mod tests {
262 use std::collections::HashMap;
263
264 use tempfile::tempdir;
265
266 use super::*;
267 use crate::aggregator::{AliasAggregator, SourceSpec};
268 use crate::alias_storage::AliasRecord;
269 use crate::registry::{TableEntry, TableRegistry};
270 use crate::schema::{FieldDef, FieldType, SchemaConfig};
271 use crate::store::Store;
272
273 // -----------------------------------------------------------------------
274 // Test helpers
275 // -----------------------------------------------------------------------
276
277 /// Minimal schema with a `status` string field (direct struct construction).
278 fn status_schema() -> SchemaConfig {
279 SchemaConfig {
280 table: "items".into(),
281 title: None,
282 description: None,
283 fields: vec![FieldDef {
284 name: "status".into(),
285 ty: FieldType::String,
286 required: false,
287 description: None,
288 }],
289 dump: None,
290 }
291 }
292
293 /// Build a tempfile-backed Store with seed rows.
294 async fn make_store_with_rows(schema: &SchemaConfig, rows: Vec<serde_json::Value>) -> Store {
295 let dir = tempdir().expect("tempdir");
296 let db_path = dir.path().join("test.db");
297 let store = Store::open(&db_path, schema.clone())
298 .await
299 .expect("store open");
300 // Leak the tempdir so the db file lives for the test duration.
301 std::mem::forget(dir);
302 for row in rows {
303 store.create(row).await.expect("insert row");
304 }
305 store
306 }
307
308 /// Build a single-table [`TableRegistry`] from a store + schema.
309 fn registry_from_store(table: &str, store: Store, schema: SchemaConfig) -> TableRegistry {
310 let mut entries = HashMap::new();
311 entries.insert(
312 table.to_string(),
313 TableEntry {
314 store: Arc::new(store),
315 schema: Arc::new(schema),
316 schema_path: Arc::new(std::path::PathBuf::new()),
317 },
318 );
319 TableRegistry::from_entries(entries, Some(table.to_string()))
320 }
321
322 /// Build a minimal [`AliasRecord`] (no aggregator, no params, no stored fields).
323 fn plain_alias(sources: SourceSpec, filter_json: &str) -> AliasRecord {
324 AliasRecord {
325 name: "test_alias".into(),
326 sources,
327 aggregator: None,
328 filter: filter_json.into(),
329 default_limit: None,
330 description: None,
331 params_schema: None,
332 fields: None,
333 scope: None,
334 }
335 }
336
337 // -----------------------------------------------------------------------
338 // Test: Rows path — Single source + plain filter
339 // -----------------------------------------------------------------------
340 #[tokio::test]
341 async fn rows_path_single_source() {
342 let schema = status_schema();
343 let store = make_store_with_rows(
344 &schema,
345 vec![
346 serde_json::json!({"status": "open"}),
347 serde_json::json!({"status": "closed"}),
348 ],
349 )
350 .await;
351 let registry = registry_from_store("items", store, schema);
352
353 // Filter: only "open" rows. ListFilter uses {"type":"eq",...} shape.
354 let filter_json = r#"{"type":"eq","field":"status","value":"open"}"#;
355 let record = plain_alias(SourceSpec::Single("items".into()), filter_json);
356
357 let result = execute_alias_run(®istry, record, None, None, None, None, None)
358 .await
359 .expect("execute_alias_run");
360
361 match result {
362 AliasRunValue::Rows(rows) => {
363 assert_eq!(rows.len(), 1);
364 assert_eq!(rows[0].data["status"], "open");
365 }
366 other => panic!("expected Rows, got {other:?}"),
367 }
368 }
369
370 // -----------------------------------------------------------------------
371 // Test: Aggregator path — Count
372 // -----------------------------------------------------------------------
373 #[tokio::test]
374 async fn aggregator_path_count() {
375 let schema = status_schema();
376 let store = make_store_with_rows(
377 &schema,
378 vec![
379 serde_json::json!({"status": "open"}),
380 serde_json::json!({"status": "open"}),
381 serde_json::json!({"status": "closed"}),
382 ],
383 )
384 .await;
385 let registry = registry_from_store("items", store, schema);
386
387 // Use an "open" status filter to match 2 of 3 rows.
388 let record = AliasRecord {
389 name: "count_alias".into(),
390 sources: SourceSpec::Single("items".into()),
391 aggregator: Some(AliasAggregator::Count),
392 filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
393 default_limit: None,
394 description: None,
395 params_schema: None,
396 fields: None,
397 scope: None,
398 };
399
400 let result = execute_alias_run(®istry, record, None, None, None, None, None)
401 .await
402 .expect("execute_alias_run");
403
404 match result {
405 AliasRunValue::Aggregate(AliasRunResult::Count(n)) => assert_eq!(n, 2),
406 other => panic!("expected Aggregate(Count(2)), got {other:?}"),
407 }
408 }
409
410 // -----------------------------------------------------------------------
411 // Test: MiniJinja render — params_schema=Some, template substitution
412 // -----------------------------------------------------------------------
413 #[tokio::test]
414 async fn jinja_render_substitution() {
415 let schema = status_schema();
416 let store = make_store_with_rows(
417 &schema,
418 vec![
419 serde_json::json!({"status": "open"}),
420 serde_json::json!({"status": "closed"}),
421 ],
422 )
423 .await;
424 let registry = registry_from_store("items", store, schema);
425
426 // Template: substitute {{ status }} from params. Uses ListFilter JSON shape.
427 let record = AliasRecord {
428 name: "templated_alias".into(),
429 sources: SourceSpec::Single("items".into()),
430 aggregator: None,
431 filter: r#"{"type":"eq","field":"status","value":"{{ status }}"}"#.into(),
432 default_limit: None,
433 description: None,
434 params_schema: Some(r#"["status"]"#.into()),
435 fields: None,
436 scope: None,
437 };
438
439 let params = serde_json::json!({"status": "closed"});
440
441 let result = execute_alias_run(®istry, record, Some(params), None, None, None, None)
442 .await
443 .expect("execute_alias_run");
444
445 match result {
446 AliasRunValue::Rows(rows) => {
447 assert_eq!(rows.len(), 1);
448 assert_eq!(rows[0].data["status"], "closed");
449 }
450 other => panic!("expected Rows, got {other:?}"),
451 }
452 }
453
454 // -----------------------------------------------------------------------
455 // Test: MCP transport stringified params — Value::String("{...}") path
456 //
457 // Some MCP transports (notably the Claude Code stdio client) deliver
458 // `Option<serde_json::Value>` argument fields as JSON-encoded strings
459 // rather than parsed objects. Verify the defensive re-parse path so
460 // `{{ key }}` still resolves when params arrives as Value::String.
461 // -----------------------------------------------------------------------
462 #[tokio::test]
463 async fn jinja_render_with_stringified_params() {
464 let schema = status_schema();
465 let store = make_store_with_rows(
466 &schema,
467 vec![
468 serde_json::json!({"status": "open"}),
469 serde_json::json!({"status": "closed"}),
470 ],
471 )
472 .await;
473 let registry = registry_from_store("items", store, schema);
474
475 let record = AliasRecord {
476 name: "templated_alias".into(),
477 sources: SourceSpec::Single("items".into()),
478 aggregator: None,
479 filter: r#"{"type":"eq","field":"status","value":"{{ status }}"}"#.into(),
480 default_limit: None,
481 description: None,
482 params_schema: Some(r#"["status"]"#.into()),
483 fields: None,
484 scope: None,
485 };
486
487 // params arrives as Value::String containing a JSON-encoded object
488 // (the failure mode observed via the Claude Code MCP transport).
489 let stringified_params = serde_json::Value::String(r#"{"status": "closed"}"#.to_string());
490
491 let result = execute_alias_run(
492 ®istry,
493 record,
494 Some(stringified_params),
495 None,
496 None,
497 None,
498 None,
499 )
500 .await
501 .expect("execute_alias_run must re-parse stringified params");
502
503 match result {
504 AliasRunValue::Rows(rows) => {
505 assert_eq!(rows.len(), 1);
506 assert_eq!(rows[0].data["status"], "closed");
507 }
508 other => panic!("expected Rows, got {other:?}"),
509 }
510 }
511
512 // -----------------------------------------------------------------------
513 // Test: Legacy mode fallback — sources Single("") + table_fallback
514 // -----------------------------------------------------------------------
515 #[tokio::test]
516 async fn legacy_mode_table_fallback() {
517 let schema = status_schema();
518 let store =
519 make_store_with_rows(&schema, vec![serde_json::json!({"status": "open"})]).await;
520 let registry = registry_from_store("items", store, schema);
521
522 // Simulate a legacy record where `sources` has an empty Single table
523 // name and `table_fallback` provides the real name.
524 let record = AliasRecord {
525 name: "legacy_alias".into(),
526 sources: SourceSpec::Single(String::new()),
527 aggregator: None,
528 filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
529 default_limit: None,
530 description: None,
531 params_schema: None,
532 fields: None,
533 scope: None,
534 };
535
536 let result = execute_alias_run(®istry, record, None, Some("items"), None, None, None)
537 .await
538 .expect("execute_alias_run");
539
540 match result {
541 AliasRunValue::Rows(rows) => assert!(!rows.is_empty()),
542 other => panic!("expected Rows, got {other:?}"),
543 }
544 }
545
546 // -----------------------------------------------------------------------
547 // Test: Multi/Pattern without aggregator → error
548 // -----------------------------------------------------------------------
549 #[tokio::test]
550 async fn multi_without_aggregator_is_error() {
551 let schema = status_schema();
552 let store = make_store_with_rows(&schema, vec![]).await;
553 let registry = registry_from_store("items", store, schema);
554
555 let record = AliasRecord {
556 name: "multi_alias".into(),
557 sources: SourceSpec::Multi(vec!["items".into(), "other".into()]),
558 aggregator: None,
559 filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
560 default_limit: None,
561 description: None,
562 params_schema: None,
563 fields: None,
564 scope: None,
565 };
566
567 let err = execute_alias_run(®istry, record, None, None, None, None, None)
568 .await
569 .expect_err("should fail");
570
571 let msg = err.to_string();
572 assert!(
573 msg.contains("Multi/Pattern source aliases require an aggregator"),
574 "unexpected error: {msg}"
575 );
576 }
577}