Skip to main content

mini_app_core/
alias_run.rs

1//! Top-level orchestration for the `alias_run` MCP tool.
2//!
3//! This module exposes [`execute_alias_run`] as the single entry point for
4//! running a named alias. Both the MCP tool handler and direct SDK consumers
5//! call this function; the MCP handler is a thin wrapper that:
6//! 1. Resolves the [`AliasRecord`] from global or per-table storage.
7//! 2. Calls [`execute_alias_run`].
8//! 3. Serialises the [`AliasRunValue`] result to JSON (backward-compat shape).
9//!
10//! # Crux compliance
11//!
12//! - **Crux #1 / #2**: `crates/core/Cargo.toml` must not declare `rmcp` as a
13//!   dependency. All types here are pure Core-native types. MCP boundary
14//!   conversions (e.g. `MiniAppError → rmcp::ErrorData`) are the sole
15//!   responsibility of private adapter functions in `crates/mcp`.
16//!
17//! # MiniJinja render pipeline (Crux §1/#2)
18//!
19//! When `record.params_schema` is `Some`, the `filter` field is a MiniJinja
20//! template. The template is rendered with `params` as the context, then the
21//! rendered string is parsed as a [`ListFilter`] JSON document. When
22//! `params_schema` is `None`, the render step is skipped entirely for backward
23//! compatibility with plain-JSON aliases.
24
25use std::sync::Arc;
26
27use serde::Serialize;
28
29use crate::aggregator::{AliasAggregator, AliasRunResult, SourceSpec, execute_aggregate};
30use crate::alias_storage::AliasRecord;
31use crate::error::MiniAppError;
32use crate::filter::ListFilter;
33use crate::materialize::{FieldSelector, apply_projection};
34use crate::registry::TableRegistry;
35
36// =============================================================================
37// Result type
38// =============================================================================
39
40/// The result of [`execute_alias_run`].
41///
42/// Wraps both the plain Rows path (per-table `store.list` + field projection)
43/// and the Aggregator path (`execute_aggregate`). MCP callers serialise each
44/// variant with its natural JSON shape to preserve backward compatibility:
45///
46/// - `Rows(records)` → `serde_json::to_string(&records)`
47/// - `Aggregate(result)` → `serde_json::to_string(&result)`
48#[derive(Debug, Serialize)]
49pub enum AliasRunValue {
50    /// Plain rows path — result of `store.list` + field projection.
51    Rows(Vec<crate::store::RowRecord>),
52    /// Aggregator path — wraps the existing [`AliasRunResult`].
53    Aggregate(AliasRunResult),
54}
55
56// =============================================================================
57// Public entry point
58// =============================================================================
59
60/// Execute an alias and return the typed result.
61///
62/// This is the canonical alias_run implementation. Both the MCP `alias_run`
63/// tool handler and direct SDK consumers call this function.
64///
65/// # Arguments
66///
67/// - `registry` — Live [`TableRegistry`]; used to resolve store handles and
68///   schema configs from table names, and to collect table names for
69///   [`SourceSpec::Pattern`] resolution.
70/// - `record` — The [`AliasRecord`] that describes the alias (sources,
71///   aggregator, filter template, parameter schema, default limit).
72/// - `params` — Optional JSON value used as the MiniJinja render context.
73///   Required when `record.params_schema` is `Some`; ignored (and silently
74///   accepted) when `None`.
75/// - `table_fallback` — Legacy single-table mode: the `table` argument
76///   supplied to `alias_run`. Used when `record.sources` is
77///   `SourceSpec::Single` with an empty placeholder produced by the legacy
78///   per-table path. Ignored when `record.sources` is already a fully
79///   populated `Single`/`Multi`/`Pattern`.
80/// - `limit_override` — Caller-supplied row limit. Falls back to
81///   `record.default_limit` when `None`.
82/// - `offset` — Number of rows to skip (plain Rows path only).
83/// - `fields` — Field projection selector (plain Rows path only).
84///
85/// # Errors
86///
87/// - [`MiniAppError::AliasParamsRequired`] — `params_schema` is `Some` but
88///   `params` is `None`.
89/// - [`MiniAppError::AliasTemplateError`] — MiniJinja render failure.
90/// - [`MiniAppError::Filter`] — filter parse or validate failure.
91/// - [`MiniAppError::Aggregator`] — aggregator execute failure.
92/// - [`MiniAppError::TableNotFound`] — a referenced table is not in the
93///   registry.
94/// - [`MiniAppError::Storage`] — underlying SQLite error.
95pub async fn execute_alias_run(
96    registry: &TableRegistry,
97    record: AliasRecord,
98    params: Option<serde_json::Value>,
99    table_fallback: Option<&str>,
100    limit_override: Option<u32>,
101    offset: Option<u32>,
102    fields: Option<FieldSelector>,
103) -> Result<AliasRunValue, MiniAppError> {
104    // -----------------------------------------------------------------
105    // Step 1: Render the filter template (Crux #1/#2).
106    // -----------------------------------------------------------------
107    let filter_text = record.filter;
108    let filter: ListFilter = if record.params_schema.is_some() {
109        let mut params_value = params.ok_or_else(|| MiniAppError::AliasParamsRequired {
110            name: record.name.clone(),
111        })?;
112        // Defensive parse: some MCP transports (notably the Claude Code
113        // stdio client) stringify `serde_json::Value` argument fields, so
114        // a `params: {"k": "v"}` payload arrives here as
115        // `Value::String("{\"k\":\"v\"}")` rather than `Value::Object(..)`.
116        // When minijinja receives a String it cannot resolve `{{ k }}`
117        // (the String has no keys), the placeholder evaluates to undefined
118        // and lenient render emits an empty value. Re-parse JSON-encoded
119        // strings into their Object form so render works regardless of
120        // the transport.
121        if let serde_json::Value::String(ref s) = params_value {
122            params_value = serde_json::from_str(s).map_err(|e| {
123                MiniAppError::AliasTemplateError(format!(
124                    "params arrived as a string but failed JSON parse: {e}"
125                ))
126            })?;
127        }
128        let env = minijinja::Environment::new();
129        let rendered = env
130            .render_str(&filter_text, &params_value)
131            .map_err(|e| MiniAppError::AliasTemplateError(e.to_string()))?;
132        serde_json::from_str(&rendered)
133            .map_err(|e| MiniAppError::Schema(format!("rendered filter parse error: {e}")))?
134    } else {
135        serde_json::from_str(&filter_text)
136            .map_err(|e| MiniAppError::Schema(format!("filter parse error: {e}")))?
137    };
138
139    let limit = limit_override.or(record.default_limit);
140
141    // -----------------------------------------------------------------
142    // Step 2: Aggregator path dispatch.
143    // -----------------------------------------------------------------
144    if let Some(agg) = record.aggregator {
145        return execute_aggregator_path(registry, record.sources, filter, agg, limit).await;
146    }
147
148    // -----------------------------------------------------------------
149    // Step 3: Plain (Rows) path.
150    // -----------------------------------------------------------------
151    execute_rows_path(
152        registry,
153        record.sources,
154        table_fallback,
155        filter,
156        limit,
157        offset,
158        fields,
159    )
160    .await
161}
162
163// =============================================================================
164// Internal helpers
165// =============================================================================
166
167/// Aggregator execution path: resolves Pattern sources, validates, and calls
168/// [`execute_aggregate`].
169async fn execute_aggregator_path(
170    registry: &TableRegistry,
171    sources: SourceSpec,
172    filter: ListFilter,
173    agg: AliasAggregator,
174    _limit: Option<u32>,
175) -> Result<AliasRunValue, MiniAppError> {
176    let resolved = if sources.requires_resolve() {
177        let table_names: Vec<String> = registry.table_names().map(str::to_owned).collect();
178        sources.resolve_pattern(&table_names)?
179    } else {
180        sources
181    };
182
183    let schema_table =
184        resolved.tables().first().cloned().ok_or_else(|| {
185            MiniAppError::Aggregator("alias sources resolved to zero tables".into())
186        })?;
187
188    let schema = Arc::clone(&registry.resolve(Some(schema_table.as_str()))?.schema);
189
190    filter.validate(&schema)?;
191
192    let result = execute_aggregate(registry, resolved, Some(filter), agg, &schema).await?;
193    Ok(AliasRunValue::Aggregate(result))
194}
195
196/// Plain rows execution path: resolves a single-table store and returns rows
197/// after applying optional field projection.
198async fn execute_rows_path(
199    registry: &TableRegistry,
200    sources: SourceSpec,
201    table_fallback: Option<&str>,
202    filter: ListFilter,
203    limit: Option<u32>,
204    offset: Option<u32>,
205    fields: Option<FieldSelector>,
206) -> Result<AliasRunValue, MiniAppError> {
207    let table_name: Option<&str> = match &sources {
208        // Non-empty Single → use it directly.
209        // Empty-string Single is the legacy sentinel produced by the MCP wrapper
210        // when reading from per-table `_aliases`; fall through to `table_fallback`.
211        SourceSpec::Single(t) if !t.is_empty() => Some(t.as_str()),
212        SourceSpec::Single(_) => None,
213        SourceSpec::Multi(_) | SourceSpec::Pattern(_) => {
214            return Err(MiniAppError::Aggregator(
215                "Multi/Pattern source aliases require an aggregator (Phase 2 limitation)".into(),
216            ));
217        }
218    };
219
220    // Use `table_name` from sources when available; otherwise fall back to
221    // the legacy `table_fallback` arg (per-table alias_run path).
222    let effective_table = table_name.or(table_fallback);
223
224    let entry = registry.resolve(effective_table)?;
225    let store = Arc::clone(&entry.store);
226    let schema = Arc::clone(&entry.schema);
227
228    filter.validate(&schema)?;
229    let records = store.list(limit, offset, Some(filter)).await?;
230    let records = apply_projection(records, &fields, &schema)?;
231    Ok(AliasRunValue::Rows(records))
232}
233
234// =============================================================================
235// Tests
236// =============================================================================
237
238#[cfg(test)]
239mod tests {
240    use std::collections::HashMap;
241
242    use tempfile::tempdir;
243
244    use super::*;
245    use crate::aggregator::{AliasAggregator, SourceSpec};
246    use crate::alias_storage::AliasRecord;
247    use crate::registry::{TableEntry, TableRegistry};
248    use crate::schema::{FieldDef, FieldType, SchemaConfig};
249    use crate::store::Store;
250
251    // -----------------------------------------------------------------------
252    // Test helpers
253    // -----------------------------------------------------------------------
254
255    /// Minimal schema with a `status` string field (direct struct construction).
256    fn status_schema() -> SchemaConfig {
257        SchemaConfig {
258            table: "items".into(),
259            title: None,
260            description: None,
261            fields: vec![FieldDef {
262                name: "status".into(),
263                ty: FieldType::String,
264                required: false,
265                description: None,
266            }],
267            dump: None,
268        }
269    }
270
271    /// Build a tempfile-backed Store with seed rows.
272    async fn make_store_with_rows(schema: &SchemaConfig, rows: Vec<serde_json::Value>) -> Store {
273        let dir = tempdir().expect("tempdir");
274        let db_path = dir.path().join("test.db");
275        let store = Store::open(&db_path, schema.clone())
276            .await
277            .expect("store open");
278        // Leak the tempdir so the db file lives for the test duration.
279        std::mem::forget(dir);
280        for row in rows {
281            store.create(row).await.expect("insert row");
282        }
283        store
284    }
285
286    /// Build a single-table [`TableRegistry`] from a store + schema.
287    fn registry_from_store(table: &str, store: Store, schema: SchemaConfig) -> TableRegistry {
288        let mut entries = HashMap::new();
289        entries.insert(
290            table.to_string(),
291            TableEntry {
292                store: Arc::new(store),
293                schema: Arc::new(schema),
294                schema_path: Arc::new(std::path::PathBuf::new()),
295            },
296        );
297        TableRegistry::from_entries(entries, Some(table.to_string()))
298    }
299
300    /// Build a minimal [`AliasRecord`] (no aggregator, no params).
301    fn plain_alias(sources: SourceSpec, filter_json: &str) -> AliasRecord {
302        AliasRecord {
303            name: "test_alias".into(),
304            sources,
305            aggregator: None,
306            filter: filter_json.into(),
307            default_limit: None,
308            description: None,
309            params_schema: None,
310            scope: None,
311        }
312    }
313
314    // -----------------------------------------------------------------------
315    // Test: Rows path — Single source + plain filter
316    // -----------------------------------------------------------------------
317    #[tokio::test]
318    async fn rows_path_single_source() {
319        let schema = status_schema();
320        let store = make_store_with_rows(
321            &schema,
322            vec![
323                serde_json::json!({"status": "open"}),
324                serde_json::json!({"status": "closed"}),
325            ],
326        )
327        .await;
328        let registry = registry_from_store("items", store, schema);
329
330        // Filter: only "open" rows. ListFilter uses {"type":"eq",...} shape.
331        let filter_json = r#"{"type":"eq","field":"status","value":"open"}"#;
332        let record = plain_alias(SourceSpec::Single("items".into()), filter_json);
333
334        let result = execute_alias_run(&registry, record, None, None, None, None, None)
335            .await
336            .expect("execute_alias_run");
337
338        match result {
339            AliasRunValue::Rows(rows) => {
340                assert_eq!(rows.len(), 1);
341                assert_eq!(rows[0].data["status"], "open");
342            }
343            other => panic!("expected Rows, got {other:?}"),
344        }
345    }
346
347    // -----------------------------------------------------------------------
348    // Test: Aggregator path — Count
349    // -----------------------------------------------------------------------
350    #[tokio::test]
351    async fn aggregator_path_count() {
352        let schema = status_schema();
353        let store = make_store_with_rows(
354            &schema,
355            vec![
356                serde_json::json!({"status": "open"}),
357                serde_json::json!({"status": "open"}),
358                serde_json::json!({"status": "closed"}),
359            ],
360        )
361        .await;
362        let registry = registry_from_store("items", store, schema);
363
364        // Use an "open" status filter to match 2 of 3 rows.
365        let record = AliasRecord {
366            name: "count_alias".into(),
367            sources: SourceSpec::Single("items".into()),
368            aggregator: Some(AliasAggregator::Count),
369            filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
370            default_limit: None,
371            description: None,
372            params_schema: None,
373            scope: None,
374        };
375
376        let result = execute_alias_run(&registry, record, None, None, None, None, None)
377            .await
378            .expect("execute_alias_run");
379
380        match result {
381            AliasRunValue::Aggregate(AliasRunResult::Count(n)) => assert_eq!(n, 2),
382            other => panic!("expected Aggregate(Count(2)), got {other:?}"),
383        }
384    }
385
386    // -----------------------------------------------------------------------
387    // Test: MiniJinja render — params_schema=Some, template substitution
388    // -----------------------------------------------------------------------
389    #[tokio::test]
390    async fn jinja_render_substitution() {
391        let schema = status_schema();
392        let store = make_store_with_rows(
393            &schema,
394            vec![
395                serde_json::json!({"status": "open"}),
396                serde_json::json!({"status": "closed"}),
397            ],
398        )
399        .await;
400        let registry = registry_from_store("items", store, schema);
401
402        // Template: substitute {{ status }} from params. Uses ListFilter JSON shape.
403        let record = AliasRecord {
404            name: "templated_alias".into(),
405            sources: SourceSpec::Single("items".into()),
406            aggregator: None,
407            filter: r#"{"type":"eq","field":"status","value":"{{ status }}"}"#.into(),
408            default_limit: None,
409            description: None,
410            params_schema: Some(r#"["status"]"#.into()),
411            scope: None,
412        };
413
414        let params = serde_json::json!({"status": "closed"});
415
416        let result = execute_alias_run(&registry, record, Some(params), None, None, None, None)
417            .await
418            .expect("execute_alias_run");
419
420        match result {
421            AliasRunValue::Rows(rows) => {
422                assert_eq!(rows.len(), 1);
423                assert_eq!(rows[0].data["status"], "closed");
424            }
425            other => panic!("expected Rows, got {other:?}"),
426        }
427    }
428
429    // -----------------------------------------------------------------------
430    // Test: MCP transport stringified params — Value::String("{...}") path
431    //
432    // Some MCP transports (notably the Claude Code stdio client) deliver
433    // `Option<serde_json::Value>` argument fields as JSON-encoded strings
434    // rather than parsed objects. Verify the defensive re-parse path so
435    // `{{ key }}` still resolves when params arrives as Value::String.
436    // -----------------------------------------------------------------------
437    #[tokio::test]
438    async fn jinja_render_with_stringified_params() {
439        let schema = status_schema();
440        let store = make_store_with_rows(
441            &schema,
442            vec![
443                serde_json::json!({"status": "open"}),
444                serde_json::json!({"status": "closed"}),
445            ],
446        )
447        .await;
448        let registry = registry_from_store("items", store, schema);
449
450        let record = AliasRecord {
451            name: "templated_alias".into(),
452            sources: SourceSpec::Single("items".into()),
453            aggregator: None,
454            filter: r#"{"type":"eq","field":"status","value":"{{ status }}"}"#.into(),
455            default_limit: None,
456            description: None,
457            params_schema: Some(r#"["status"]"#.into()),
458            scope: None,
459        };
460
461        // params arrives as Value::String containing a JSON-encoded object
462        // (the failure mode observed via the Claude Code MCP transport).
463        let stringified_params = serde_json::Value::String(r#"{"status": "closed"}"#.to_string());
464
465        let result = execute_alias_run(
466            &registry,
467            record,
468            Some(stringified_params),
469            None,
470            None,
471            None,
472            None,
473        )
474        .await
475        .expect("execute_alias_run must re-parse stringified params");
476
477        match result {
478            AliasRunValue::Rows(rows) => {
479                assert_eq!(rows.len(), 1);
480                assert_eq!(rows[0].data["status"], "closed");
481            }
482            other => panic!("expected Rows, got {other:?}"),
483        }
484    }
485
486    // -----------------------------------------------------------------------
487    // Test: Legacy mode fallback — sources Single("") + table_fallback
488    // -----------------------------------------------------------------------
489    #[tokio::test]
490    async fn legacy_mode_table_fallback() {
491        let schema = status_schema();
492        let store =
493            make_store_with_rows(&schema, vec![serde_json::json!({"status": "open"})]).await;
494        let registry = registry_from_store("items", store, schema);
495
496        // Simulate a legacy record where `sources` has an empty Single table
497        // name and `table_fallback` provides the real name.
498        let record = AliasRecord {
499            name: "legacy_alias".into(),
500            sources: SourceSpec::Single(String::new()),
501            aggregator: None,
502            filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
503            default_limit: None,
504            description: None,
505            params_schema: None,
506            scope: None,
507        };
508
509        let result = execute_alias_run(&registry, record, None, Some("items"), None, None, None)
510            .await
511            .expect("execute_alias_run");
512
513        match result {
514            AliasRunValue::Rows(rows) => assert!(!rows.is_empty()),
515            other => panic!("expected Rows, got {other:?}"),
516        }
517    }
518
519    // -----------------------------------------------------------------------
520    // Test: Multi/Pattern without aggregator → error
521    // -----------------------------------------------------------------------
522    #[tokio::test]
523    async fn multi_without_aggregator_is_error() {
524        let schema = status_schema();
525        let store = make_store_with_rows(&schema, vec![]).await;
526        let registry = registry_from_store("items", store, schema);
527
528        let record = AliasRecord {
529            name: "multi_alias".into(),
530            sources: SourceSpec::Multi(vec!["items".into(), "other".into()]),
531            aggregator: None,
532            filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
533            default_limit: None,
534            description: None,
535            params_schema: None,
536            scope: None,
537        };
538
539        let err = execute_alias_run(&registry, record, None, None, None, None, None)
540            .await
541            .expect_err("should fail");
542
543        let msg = err.to_string();
544        assert!(
545            msg.contains("Multi/Pattern source aliases require an aggregator"),
546            "unexpected error: {msg}"
547        );
548    }
549}