mini_app_core/alias_run.rs
1//! Top-level orchestration for the `alias_run` MCP tool.
2//!
3//! This module exposes [`execute_alias_run`] as the single entry point for
4//! running a named alias. Both the MCP tool handler and direct SDK consumers
5//! call this function; the MCP handler is a thin wrapper that:
6//! 1. Resolves the [`AliasRecord`] from global or per-table storage.
7//! 2. Calls [`execute_alias_run`].
8//! 3. Serialises the [`AliasRunValue`] result to JSON (backward-compat shape).
9//!
10//! # Crux compliance
11//!
12//! - **Crux #1 / #2**: `crates/core/Cargo.toml` must not declare `rmcp` as a
13//! dependency. All types here are pure Core-native types. MCP boundary
14//! conversions (e.g. `MiniAppError → rmcp::ErrorData`) are the sole
15//! responsibility of private adapter functions in `crates/mcp`.
16//!
17//! # MiniJinja render pipeline (Crux §1/#2)
18//!
19//! When `record.params_schema` is `Some`, the `filter` field is a MiniJinja
20//! template. The template is rendered with `params` as the context, then the
21//! rendered string is parsed as a [`ListFilter`] JSON document. When
22//! `params_schema` is `None`, the render step is skipped entirely for backward
23//! compatibility with plain-JSON aliases.
24
25use std::sync::Arc;
26
27use serde::Serialize;
28
29use crate::aggregator::{AliasAggregator, AliasRunResult, SourceSpec, execute_aggregate};
30use crate::alias_storage::AliasRecord;
31use crate::error::MiniAppError;
32use crate::filter::ListFilter;
33use crate::materialize::{FieldSelector, apply_projection};
34use crate::registry::TableRegistry;
35
36// =============================================================================
37// Result type
38// =============================================================================
39
40/// The result of [`execute_alias_run`].
41///
42/// Wraps both the plain Rows path (per-table `store.list` + field projection)
43/// and the Aggregator path (`execute_aggregate`). MCP callers serialise each
44/// variant with its natural JSON shape to preserve backward compatibility:
45///
46/// - `Rows(records)` → `serde_json::to_string(&records)`
47/// - `Aggregate(result)` → `serde_json::to_string(&result)`
48#[derive(Debug, Serialize)]
49pub enum AliasRunValue {
50 /// Plain rows path — result of `store.list` + field projection.
51 Rows(Vec<crate::store::RowRecord>),
52 /// Aggregator path — wraps the existing [`AliasRunResult`].
53 Aggregate(AliasRunResult),
54}
55
56// =============================================================================
57// Public entry point
58// =============================================================================
59
60/// Execute an alias and return the typed result.
61///
62/// This is the canonical alias_run implementation. Both the MCP `alias_run`
63/// tool handler and direct SDK consumers call this function.
64///
65/// # Arguments
66///
67/// - `registry` — Live [`TableRegistry`]; used to resolve store handles and
68/// schema configs from table names, and to collect table names for
69/// [`SourceSpec::Pattern`] resolution.
70/// - `record` — The [`AliasRecord`] that describes the alias (sources,
71/// aggregator, filter template, parameter schema, default limit).
72/// - `params` — Optional JSON value used as the MiniJinja render context.
73/// Required when `record.params_schema` is `Some`; ignored (and silently
74/// accepted) when `None`.
75/// - `table_fallback` — Legacy single-table mode: the `table` argument
76/// supplied to `alias_run`. Used when `record.sources` is
77/// `SourceSpec::Single` with an empty placeholder produced by the legacy
78/// per-table path. Ignored when `record.sources` is already a fully
79/// populated `Single`/`Multi`/`Pattern`.
80/// - `limit_override` — Caller-supplied row limit. Falls back to
81/// `record.default_limit` when `None`.
82/// - `offset` — Number of rows to skip (plain Rows path only).
83/// - `fields` — Field projection selector (plain Rows path only).
84///
85/// # Errors
86///
87/// - [`MiniAppError::AliasParamsRequired`] — `params_schema` is `Some` but
88/// `params` is `None`.
89/// - [`MiniAppError::AliasTemplateError`] — MiniJinja render failure.
90/// - [`MiniAppError::Filter`] — filter parse or validate failure.
91/// - [`MiniAppError::Aggregator`] — aggregator execute failure.
92/// - [`MiniAppError::TableNotFound`] — a referenced table is not in the
93/// registry.
94/// - [`MiniAppError::Storage`] — underlying SQLite error.
95pub async fn execute_alias_run(
96 registry: &TableRegistry,
97 record: AliasRecord,
98 params: Option<serde_json::Value>,
99 table_fallback: Option<&str>,
100 limit_override: Option<u32>,
101 offset: Option<u32>,
102 fields: Option<FieldSelector>,
103) -> Result<AliasRunValue, MiniAppError> {
104 // -----------------------------------------------------------------
105 // Step 1: Render the filter template (Crux #1/#2).
106 // -----------------------------------------------------------------
107 let filter_text = record.filter;
108 let filter: ListFilter = if record.params_schema.is_some() {
109 let mut params_value = params.ok_or_else(|| MiniAppError::AliasParamsRequired {
110 name: record.name.clone(),
111 })?;
112 // Defensive parse: some MCP transports (notably the Claude Code
113 // stdio client) stringify `serde_json::Value` argument fields, so
114 // a `params: {"k": "v"}` payload arrives here as
115 // `Value::String("{\"k\":\"v\"}")` rather than `Value::Object(..)`.
116 // When minijinja receives a String it cannot resolve `{{ k }}`
117 // (the String has no keys), the placeholder evaluates to undefined
118 // and lenient render emits an empty value. Re-parse JSON-encoded
119 // strings into their Object form so render works regardless of
120 // the transport.
121 if let serde_json::Value::String(ref s) = params_value {
122 params_value = serde_json::from_str(s).map_err(|e| {
123 MiniAppError::AliasTemplateError(format!(
124 "params arrived as a string but failed JSON parse: {e}"
125 ))
126 })?;
127 }
128 let env = minijinja::Environment::new();
129 let rendered = env
130 .render_str(&filter_text, ¶ms_value)
131 .map_err(|e| MiniAppError::AliasTemplateError(e.to_string()))?;
132 serde_json::from_str(&rendered)
133 .map_err(|e| MiniAppError::Schema(format!("rendered filter parse error: {e}")))?
134 } else {
135 serde_json::from_str(&filter_text)
136 .map_err(|e| MiniAppError::Schema(format!("filter parse error: {e}")))?
137 };
138
139 let limit = limit_override.or(record.default_limit);
140
141 // -----------------------------------------------------------------
142 // Step 2: Aggregator path dispatch.
143 // -----------------------------------------------------------------
144 if let Some(agg) = record.aggregator {
145 return execute_aggregator_path(registry, record.sources, filter, agg, limit).await;
146 }
147
148 // -----------------------------------------------------------------
149 // Step 3: Plain (Rows) path.
150 // -----------------------------------------------------------------
151 execute_rows_path(
152 registry,
153 record.sources,
154 table_fallback,
155 filter,
156 limit,
157 offset,
158 fields,
159 )
160 .await
161}
162
163// =============================================================================
164// Internal helpers
165// =============================================================================
166
167/// Aggregator execution path: resolves Pattern sources, validates, and calls
168/// [`execute_aggregate`].
169async fn execute_aggregator_path(
170 registry: &TableRegistry,
171 sources: SourceSpec,
172 filter: ListFilter,
173 agg: AliasAggregator,
174 _limit: Option<u32>,
175) -> Result<AliasRunValue, MiniAppError> {
176 let resolved = if sources.requires_resolve() {
177 let table_names: Vec<String> = registry.table_names().map(str::to_owned).collect();
178 sources.resolve_pattern(&table_names)?
179 } else {
180 sources
181 };
182
183 let schema_table =
184 resolved.tables().first().cloned().ok_or_else(|| {
185 MiniAppError::Aggregator("alias sources resolved to zero tables".into())
186 })?;
187
188 let schema = Arc::clone(®istry.resolve(Some(schema_table.as_str()))?.schema);
189
190 filter.validate(&schema)?;
191
192 let result = execute_aggregate(registry, resolved, Some(filter), agg, &schema).await?;
193 Ok(AliasRunValue::Aggregate(result))
194}
195
196/// Plain rows execution path: resolves a single-table store and returns rows
197/// after applying optional field projection.
198async fn execute_rows_path(
199 registry: &TableRegistry,
200 sources: SourceSpec,
201 table_fallback: Option<&str>,
202 filter: ListFilter,
203 limit: Option<u32>,
204 offset: Option<u32>,
205 fields: Option<FieldSelector>,
206) -> Result<AliasRunValue, MiniAppError> {
207 let table_name: Option<&str> = match &sources {
208 // Non-empty Single → use it directly.
209 // Empty-string Single is the legacy sentinel produced by the MCP wrapper
210 // when reading from per-table `_aliases`; fall through to `table_fallback`.
211 SourceSpec::Single(t) if !t.is_empty() => Some(t.as_str()),
212 SourceSpec::Single(_) => None,
213 SourceSpec::Multi(_) | SourceSpec::Pattern(_) => {
214 return Err(MiniAppError::Aggregator(
215 "Multi/Pattern source aliases require an aggregator (Phase 2 limitation)".into(),
216 ));
217 }
218 };
219
220 // Use `table_name` from sources when available; otherwise fall back to
221 // the legacy `table_fallback` arg (per-table alias_run path).
222 let effective_table = table_name.or(table_fallback);
223
224 let entry = registry.resolve(effective_table)?;
225 let store = Arc::clone(&entry.store);
226 let schema = Arc::clone(&entry.schema);
227
228 filter.validate(&schema)?;
229 let records = store.list(limit, offset, Some(filter)).await?;
230 let records = apply_projection(records, &fields, &schema)?;
231 Ok(AliasRunValue::Rows(records))
232}
233
234// =============================================================================
235// Tests
236// =============================================================================
237
238#[cfg(test)]
239mod tests {
240 use std::collections::HashMap;
241
242 use tempfile::tempdir;
243
244 use super::*;
245 use crate::aggregator::{AliasAggregator, SourceSpec};
246 use crate::alias_storage::AliasRecord;
247 use crate::registry::{TableEntry, TableRegistry};
248 use crate::schema::{FieldDef, FieldType, SchemaConfig};
249 use crate::store::Store;
250
251 // -----------------------------------------------------------------------
252 // Test helpers
253 // -----------------------------------------------------------------------
254
255 /// Minimal schema with a `status` string field (direct struct construction).
256 fn status_schema() -> SchemaConfig {
257 SchemaConfig {
258 table: "items".into(),
259 title: None,
260 description: None,
261 fields: vec![FieldDef {
262 name: "status".into(),
263 ty: FieldType::String,
264 required: false,
265 description: None,
266 }],
267 dump: None,
268 }
269 }
270
271 /// Build a tempfile-backed Store with seed rows.
272 async fn make_store_with_rows(schema: &SchemaConfig, rows: Vec<serde_json::Value>) -> Store {
273 let dir = tempdir().expect("tempdir");
274 let db_path = dir.path().join("test.db");
275 let store = Store::open(&db_path, schema.clone())
276 .await
277 .expect("store open");
278 // Leak the tempdir so the db file lives for the test duration.
279 std::mem::forget(dir);
280 for row in rows {
281 store.create(row).await.expect("insert row");
282 }
283 store
284 }
285
286 /// Build a single-table [`TableRegistry`] from a store + schema.
287 fn registry_from_store(table: &str, store: Store, schema: SchemaConfig) -> TableRegistry {
288 let mut entries = HashMap::new();
289 entries.insert(
290 table.to_string(),
291 TableEntry {
292 store: Arc::new(store),
293 schema: Arc::new(schema),
294 schema_path: Arc::new(std::path::PathBuf::new()),
295 },
296 );
297 TableRegistry::from_entries(entries, Some(table.to_string()))
298 }
299
300 /// Build a minimal [`AliasRecord`] (no aggregator, no params).
301 fn plain_alias(sources: SourceSpec, filter_json: &str) -> AliasRecord {
302 AliasRecord {
303 name: "test_alias".into(),
304 sources,
305 aggregator: None,
306 filter: filter_json.into(),
307 default_limit: None,
308 description: None,
309 params_schema: None,
310 scope: None,
311 }
312 }
313
314 // -----------------------------------------------------------------------
315 // Test: Rows path — Single source + plain filter
316 // -----------------------------------------------------------------------
317 #[tokio::test]
318 async fn rows_path_single_source() {
319 let schema = status_schema();
320 let store = make_store_with_rows(
321 &schema,
322 vec![
323 serde_json::json!({"status": "open"}),
324 serde_json::json!({"status": "closed"}),
325 ],
326 )
327 .await;
328 let registry = registry_from_store("items", store, schema);
329
330 // Filter: only "open" rows. ListFilter uses {"type":"eq",...} shape.
331 let filter_json = r#"{"type":"eq","field":"status","value":"open"}"#;
332 let record = plain_alias(SourceSpec::Single("items".into()), filter_json);
333
334 let result = execute_alias_run(®istry, record, None, None, None, None, None)
335 .await
336 .expect("execute_alias_run");
337
338 match result {
339 AliasRunValue::Rows(rows) => {
340 assert_eq!(rows.len(), 1);
341 assert_eq!(rows[0].data["status"], "open");
342 }
343 other => panic!("expected Rows, got {other:?}"),
344 }
345 }
346
347 // -----------------------------------------------------------------------
348 // Test: Aggregator path — Count
349 // -----------------------------------------------------------------------
350 #[tokio::test]
351 async fn aggregator_path_count() {
352 let schema = status_schema();
353 let store = make_store_with_rows(
354 &schema,
355 vec![
356 serde_json::json!({"status": "open"}),
357 serde_json::json!({"status": "open"}),
358 serde_json::json!({"status": "closed"}),
359 ],
360 )
361 .await;
362 let registry = registry_from_store("items", store, schema);
363
364 // Use an "open" status filter to match 2 of 3 rows.
365 let record = AliasRecord {
366 name: "count_alias".into(),
367 sources: SourceSpec::Single("items".into()),
368 aggregator: Some(AliasAggregator::Count),
369 filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
370 default_limit: None,
371 description: None,
372 params_schema: None,
373 scope: None,
374 };
375
376 let result = execute_alias_run(®istry, record, None, None, None, None, None)
377 .await
378 .expect("execute_alias_run");
379
380 match result {
381 AliasRunValue::Aggregate(AliasRunResult::Count(n)) => assert_eq!(n, 2),
382 other => panic!("expected Aggregate(Count(2)), got {other:?}"),
383 }
384 }
385
386 // -----------------------------------------------------------------------
387 // Test: MiniJinja render — params_schema=Some, template substitution
388 // -----------------------------------------------------------------------
389 #[tokio::test]
390 async fn jinja_render_substitution() {
391 let schema = status_schema();
392 let store = make_store_with_rows(
393 &schema,
394 vec![
395 serde_json::json!({"status": "open"}),
396 serde_json::json!({"status": "closed"}),
397 ],
398 )
399 .await;
400 let registry = registry_from_store("items", store, schema);
401
402 // Template: substitute {{ status }} from params. Uses ListFilter JSON shape.
403 let record = AliasRecord {
404 name: "templated_alias".into(),
405 sources: SourceSpec::Single("items".into()),
406 aggregator: None,
407 filter: r#"{"type":"eq","field":"status","value":"{{ status }}"}"#.into(),
408 default_limit: None,
409 description: None,
410 params_schema: Some(r#"["status"]"#.into()),
411 scope: None,
412 };
413
414 let params = serde_json::json!({"status": "closed"});
415
416 let result = execute_alias_run(®istry, record, Some(params), None, None, None, None)
417 .await
418 .expect("execute_alias_run");
419
420 match result {
421 AliasRunValue::Rows(rows) => {
422 assert_eq!(rows.len(), 1);
423 assert_eq!(rows[0].data["status"], "closed");
424 }
425 other => panic!("expected Rows, got {other:?}"),
426 }
427 }
428
429 // -----------------------------------------------------------------------
430 // Test: MCP transport stringified params — Value::String("{...}") path
431 //
432 // Some MCP transports (notably the Claude Code stdio client) deliver
433 // `Option<serde_json::Value>` argument fields as JSON-encoded strings
434 // rather than parsed objects. Verify the defensive re-parse path so
435 // `{{ key }}` still resolves when params arrives as Value::String.
436 // -----------------------------------------------------------------------
437 #[tokio::test]
438 async fn jinja_render_with_stringified_params() {
439 let schema = status_schema();
440 let store = make_store_with_rows(
441 &schema,
442 vec![
443 serde_json::json!({"status": "open"}),
444 serde_json::json!({"status": "closed"}),
445 ],
446 )
447 .await;
448 let registry = registry_from_store("items", store, schema);
449
450 let record = AliasRecord {
451 name: "templated_alias".into(),
452 sources: SourceSpec::Single("items".into()),
453 aggregator: None,
454 filter: r#"{"type":"eq","field":"status","value":"{{ status }}"}"#.into(),
455 default_limit: None,
456 description: None,
457 params_schema: Some(r#"["status"]"#.into()),
458 scope: None,
459 };
460
461 // params arrives as Value::String containing a JSON-encoded object
462 // (the failure mode observed via the Claude Code MCP transport).
463 let stringified_params = serde_json::Value::String(r#"{"status": "closed"}"#.to_string());
464
465 let result = execute_alias_run(
466 ®istry,
467 record,
468 Some(stringified_params),
469 None,
470 None,
471 None,
472 None,
473 )
474 .await
475 .expect("execute_alias_run must re-parse stringified params");
476
477 match result {
478 AliasRunValue::Rows(rows) => {
479 assert_eq!(rows.len(), 1);
480 assert_eq!(rows[0].data["status"], "closed");
481 }
482 other => panic!("expected Rows, got {other:?}"),
483 }
484 }
485
486 // -----------------------------------------------------------------------
487 // Test: Legacy mode fallback — sources Single("") + table_fallback
488 // -----------------------------------------------------------------------
489 #[tokio::test]
490 async fn legacy_mode_table_fallback() {
491 let schema = status_schema();
492 let store =
493 make_store_with_rows(&schema, vec![serde_json::json!({"status": "open"})]).await;
494 let registry = registry_from_store("items", store, schema);
495
496 // Simulate a legacy record where `sources` has an empty Single table
497 // name and `table_fallback` provides the real name.
498 let record = AliasRecord {
499 name: "legacy_alias".into(),
500 sources: SourceSpec::Single(String::new()),
501 aggregator: None,
502 filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
503 default_limit: None,
504 description: None,
505 params_schema: None,
506 scope: None,
507 };
508
509 let result = execute_alias_run(®istry, record, None, Some("items"), None, None, None)
510 .await
511 .expect("execute_alias_run");
512
513 match result {
514 AliasRunValue::Rows(rows) => assert!(!rows.is_empty()),
515 other => panic!("expected Rows, got {other:?}"),
516 }
517 }
518
519 // -----------------------------------------------------------------------
520 // Test: Multi/Pattern without aggregator → error
521 // -----------------------------------------------------------------------
522 #[tokio::test]
523 async fn multi_without_aggregator_is_error() {
524 let schema = status_schema();
525 let store = make_store_with_rows(&schema, vec![]).await;
526 let registry = registry_from_store("items", store, schema);
527
528 let record = AliasRecord {
529 name: "multi_alias".into(),
530 sources: SourceSpec::Multi(vec!["items".into(), "other".into()]),
531 aggregator: None,
532 filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
533 default_limit: None,
534 description: None,
535 params_schema: None,
536 scope: None,
537 };
538
539 let err = execute_alias_run(®istry, record, None, None, None, None, None)
540 .await
541 .expect_err("should fail");
542
543 let msg = err.to_string();
544 assert!(
545 msg.contains("Multi/Pattern source aliases require an aggregator"),
546 "unexpected error: {msg}"
547 );
548 }
549}