Skip to main content

mini_app_core/
alias_run.rs

1//! Top-level orchestration for the `alias_run` MCP tool.
2//!
3//! This module exposes [`execute_alias_run`] as the single entry point for
4//! running a named alias. Both the MCP tool handler and direct SDK consumers
5//! call this function; the MCP handler is a thin wrapper that:
6//! 1. Resolves the [`AliasRecord`] from global or per-table storage.
7//! 2. Calls [`execute_alias_run`].
8//! 3. Serialises the [`AliasRunValue`] result to JSON (backward-compat shape).
9//!
10//! # Crux compliance
11//!
12//! - **Crux #1 / #2**: `crates/core/Cargo.toml` must not declare `rmcp` as a
13//!   dependency. All types here are pure Core-native types. MCP boundary
14//!   conversions (e.g. `MiniAppError → rmcp::ErrorData`) are the sole
15//!   responsibility of private adapter functions in `crates/mcp`.
16//!
17//! # MiniJinja render pipeline (Crux §1/#2)
18//!
19//! When `record.params_schema` is `Some`, the `filter` field is a MiniJinja
20//! template. The template is rendered with `params` as the context, then the
21//! rendered string is parsed as a [`ListFilter`] JSON document. When
22//! `params_schema` is `None`, the render step is skipped entirely for backward
23//! compatibility with plain-JSON aliases.
24
25use std::sync::Arc;
26
27use serde::Serialize;
28
29use crate::aggregator::{AliasAggregator, AliasRunResult, SourceSpec, execute_aggregate};
30use crate::alias_storage::AliasRecord;
31use crate::error::MiniAppError;
32use crate::filter::ListFilter;
33use crate::materialize::{FieldSelector, apply_projection};
34use crate::registry::TableRegistry;
35
36// =============================================================================
37// Result type
38// =============================================================================
39
40/// The result of [`execute_alias_run`].
41///
42/// Wraps both the plain Rows path (per-table `store.list` + field projection)
43/// and the Aggregator path (`execute_aggregate`). MCP callers serialise each
44/// variant with its natural JSON shape to preserve backward compatibility:
45///
46/// - `Rows(records)` → `serde_json::to_string(&records)`
47/// - `Aggregate(result)` → `serde_json::to_string(&result)`
48#[derive(Debug, Serialize)]
49pub enum AliasRunValue {
50    /// Plain rows path — result of `store.list` + field projection.
51    Rows(Vec<crate::store::RowRecord>),
52    /// Aggregator path — wraps the existing [`AliasRunResult`].
53    Aggregate(AliasRunResult),
54}
55
56// =============================================================================
57// Public entry point
58// =============================================================================
59
60/// Execute an alias and return the typed result.
61///
62/// This is the canonical alias_run implementation. Both the MCP `alias_run`
63/// tool handler and direct SDK consumers call this function.
64///
65/// # Arguments
66///
67/// - `registry` — Live [`TableRegistry`]; used to resolve store handles and
68///   schema configs from table names, and to collect table names for
69///   [`SourceSpec::Pattern`] resolution.
70/// - `record` — The [`AliasRecord`] that describes the alias (sources,
71///   aggregator, filter template, parameter schema, default limit).
72/// - `params` — Optional JSON value used as the MiniJinja render context.
73///   Required when `record.params_schema` is `Some`; ignored (and silently
74///   accepted) when `None`.
75/// - `table_fallback` — Legacy single-table mode: the `table` argument
76///   supplied to `alias_run`. Used when `record.sources` is
77///   `SourceSpec::Single` with an empty placeholder produced by the legacy
78///   per-table path. Ignored when `record.sources` is already a fully
79///   populated `Single`/`Multi`/`Pattern`.
80/// - `limit_override` — Caller-supplied row limit. Falls back to
81///   `record.default_limit` when `None`.
82/// - `offset` — Number of rows to skip (plain Rows path only).
83/// - `fields` — Field projection selector (plain Rows path only).
84///
85/// # Errors
86///
87/// - [`MiniAppError::AliasParamsRequired`] — `params_schema` is `Some` but
88///   `params` is `None`.
89/// - [`MiniAppError::AliasTemplateError`] — MiniJinja render failure.
90/// - [`MiniAppError::Filter`] — filter parse or validate failure.
91/// - [`MiniAppError::Aggregator`] — aggregator execute failure.
92/// - [`MiniAppError::TableNotFound`] — a referenced table is not in the
93///   registry.
94/// - [`MiniAppError::Storage`] — underlying SQLite error.
95pub async fn execute_alias_run(
96    registry: &TableRegistry,
97    record: AliasRecord,
98    params: Option<serde_json::Value>,
99    table_fallback: Option<&str>,
100    limit_override: Option<u32>,
101    offset: Option<u32>,
102    fields: Option<FieldSelector>,
103) -> Result<AliasRunValue, MiniAppError> {
104    // -----------------------------------------------------------------
105    // Step 1: Render the filter template (Crux #1/#2).
106    // -----------------------------------------------------------------
107    let filter_text = record.filter;
108    let filter: ListFilter = if record.params_schema.is_some() {
109        let mut params_value = params.ok_or_else(|| MiniAppError::AliasParamsRequired {
110            name: record.name.clone(),
111        })?;
112        // Defensive parse: some MCP transports (notably the Claude Code
113        // stdio client) stringify `serde_json::Value` argument fields, so
114        // a `params: {"k": "v"}` payload arrives here as
115        // `Value::String("{\"k\":\"v\"}")` rather than `Value::Object(..)`.
116        // When minijinja receives a String it cannot resolve `{{ k }}`
117        // (the String has no keys), the placeholder evaluates to undefined
118        // and lenient render emits an empty value. Re-parse JSON-encoded
119        // strings into their Object form so render works regardless of
120        // the transport.
121        if let serde_json::Value::String(ref s) = params_value {
122            params_value = serde_json::from_str(s).map_err(|e| {
123                MiniAppError::AliasTemplateError(format!(
124                    "params arrived as a string but failed JSON parse: {e}"
125                ))
126            })?;
127        }
128        let env = minijinja::Environment::new();
129        let rendered = env
130            .render_str(&filter_text, &params_value)
131            .map_err(|e| MiniAppError::AliasTemplateError(e.to_string()))?;
132        serde_json::from_str(&rendered)
133            .map_err(|e| MiniAppError::Schema(format!("rendered filter parse error: {e}")))?
134    } else {
135        serde_json::from_str(&filter_text)
136            .map_err(|e| MiniAppError::Schema(format!("filter parse error: {e}")))?
137    };
138
139    let limit = limit_override.or(record.default_limit);
140
141    // -----------------------------------------------------------------
142    // Crux #2: Field-projection fallback.
143    //
144    // When the caller supplies `fields` at run-time, use it as-is.
145    // When the caller omits `fields` (None), fall back to the stored
146    // default from `record.fields`.
147    // When `record.fields` is also None, no projection is applied — all
148    // fields are returned (Crux #3: NULL must never become an empty list).
149    // -----------------------------------------------------------------
150    let fields = match fields {
151        Some(f) => Some(f),
152        None => match record.fields.as_deref() {
153            Some(json) => Some(serde_json::from_str(json).map_err(|e| {
154                MiniAppError::Schema(format!(
155                    "alias '{}' stored fields parse error: {e}",
156                    record.name
157                ))
158            })?),
159            None => None,
160        },
161    };
162
163    // -----------------------------------------------------------------
164    // Step 2: Aggregator path dispatch.
165    // -----------------------------------------------------------------
166    if let Some(agg) = record.aggregator {
167        return execute_aggregator_path(registry, record.sources, filter, agg, limit).await;
168    }
169
170    // -----------------------------------------------------------------
171    // Step 3: Plain (Rows) path.
172    // -----------------------------------------------------------------
173    execute_rows_path(
174        registry,
175        record.sources,
176        table_fallback,
177        filter,
178        limit,
179        offset,
180        fields,
181    )
182    .await
183}
184
185// =============================================================================
186// Internal helpers
187// =============================================================================
188
189/// Aggregator execution path: resolves Pattern sources, validates, and calls
190/// [`execute_aggregate`].
191async fn execute_aggregator_path(
192    registry: &TableRegistry,
193    sources: SourceSpec,
194    filter: ListFilter,
195    agg: AliasAggregator,
196    _limit: Option<u32>,
197) -> Result<AliasRunValue, MiniAppError> {
198    let resolved = if sources.requires_resolve() {
199        let table_names: Vec<String> = registry.table_names().map(str::to_owned).collect();
200        sources.resolve_pattern(&table_names)?
201    } else {
202        sources
203    };
204
205    let schema_table =
206        resolved.tables().first().cloned().ok_or_else(|| {
207            MiniAppError::Aggregator("alias sources resolved to zero tables".into())
208        })?;
209
210    let schema = Arc::clone(&registry.resolve(Some(schema_table.as_str()))?.schema);
211
212    filter.validate(&schema)?;
213
214    let result = execute_aggregate(registry, resolved, Some(filter), agg, &schema).await?;
215    Ok(AliasRunValue::Aggregate(result))
216}
217
218/// Plain rows execution path: resolves a single-table store and returns rows
219/// after applying optional field projection.
220async fn execute_rows_path(
221    registry: &TableRegistry,
222    sources: SourceSpec,
223    table_fallback: Option<&str>,
224    filter: ListFilter,
225    limit: Option<u32>,
226    offset: Option<u32>,
227    fields: Option<FieldSelector>,
228) -> Result<AliasRunValue, MiniAppError> {
229    let table_name: Option<&str> = match &sources {
230        // Non-empty Single → use it directly.
231        // Empty-string Single is the legacy sentinel produced by the MCP wrapper
232        // when reading from per-table `_aliases`; fall through to `table_fallback`.
233        SourceSpec::Single(t) if !t.is_empty() => Some(t.as_str()),
234        SourceSpec::Single(_) => None,
235        SourceSpec::Multi(_) | SourceSpec::Pattern(_) => {
236            return Err(MiniAppError::Aggregator(
237                "Multi/Pattern source aliases require an aggregator (Phase 2 limitation)".into(),
238            ));
239        }
240    };
241
242    // Use `table_name` from sources when available; otherwise fall back to
243    // the legacy `table_fallback` arg (per-table alias_run path).
244    let effective_table = table_name.or(table_fallback);
245
246    let entry = registry.resolve(effective_table)?;
247    let store = Arc::clone(&entry.store);
248    let schema = Arc::clone(&entry.schema);
249
250    filter.validate(&schema)?;
251    let records = store.list(limit, offset, Some(filter)).await?;
252    let records = apply_projection(records, &fields, &schema)?;
253    Ok(AliasRunValue::Rows(records))
254}
255
256// =============================================================================
257// Tests
258// =============================================================================
259
260#[cfg(test)]
261mod tests {
262    use std::collections::HashMap;
263
264    use tempfile::tempdir;
265
266    use super::*;
267    use crate::aggregator::{AliasAggregator, SourceSpec};
268    use crate::alias_storage::AliasRecord;
269    use crate::registry::{TableEntry, TableRegistry};
270    use crate::schema::{FieldDef, FieldType, SchemaConfig};
271    use crate::store::Store;
272
273    // -----------------------------------------------------------------------
274    // Test helpers
275    // -----------------------------------------------------------------------
276
277    /// Minimal schema with a `status` string field (direct struct construction).
278    fn status_schema() -> SchemaConfig {
279        SchemaConfig {
280            table: "items".into(),
281            title: None,
282            description: None,
283            fields: vec![FieldDef {
284                name: "status".into(),
285                ty: FieldType::String,
286                required: false,
287                description: None,
288            }],
289            dump: None,
290        }
291    }
292
293    /// Build a tempfile-backed Store with seed rows.
294    async fn make_store_with_rows(schema: &SchemaConfig, rows: Vec<serde_json::Value>) -> Store {
295        let dir = tempdir().expect("tempdir");
296        let db_path = dir.path().join("test.db");
297        let store = Store::open(&db_path, schema.clone())
298            .await
299            .expect("store open");
300        // Leak the tempdir so the db file lives for the test duration.
301        std::mem::forget(dir);
302        for row in rows {
303            store.create(row).await.expect("insert row");
304        }
305        store
306    }
307
308    /// Build a single-table [`TableRegistry`] from a store + schema.
309    fn registry_from_store(table: &str, store: Store, schema: SchemaConfig) -> TableRegistry {
310        let mut entries = HashMap::new();
311        entries.insert(
312            table.to_string(),
313            TableEntry {
314                store: Arc::new(store),
315                schema: Arc::new(schema),
316                schema_path: Arc::new(std::path::PathBuf::new()),
317            },
318        );
319        TableRegistry::from_entries(entries, Some(table.to_string()))
320    }
321
322    /// Build a minimal [`AliasRecord`] (no aggregator, no params, no stored fields).
323    fn plain_alias(sources: SourceSpec, filter_json: &str) -> AliasRecord {
324        AliasRecord {
325            name: "test_alias".into(),
326            sources,
327            aggregator: None,
328            filter: filter_json.into(),
329            default_limit: None,
330            description: None,
331            params_schema: None,
332            fields: None,
333            scope: None,
334        }
335    }
336
337    // -----------------------------------------------------------------------
338    // Test: Rows path — Single source + plain filter
339    // -----------------------------------------------------------------------
340    #[tokio::test]
341    async fn rows_path_single_source() {
342        let schema = status_schema();
343        let store = make_store_with_rows(
344            &schema,
345            vec![
346                serde_json::json!({"status": "open"}),
347                serde_json::json!({"status": "closed"}),
348            ],
349        )
350        .await;
351        let registry = registry_from_store("items", store, schema);
352
353        // Filter: only "open" rows. ListFilter uses {"type":"eq",...} shape.
354        let filter_json = r#"{"type":"eq","field":"status","value":"open"}"#;
355        let record = plain_alias(SourceSpec::Single("items".into()), filter_json);
356
357        let result = execute_alias_run(&registry, record, None, None, None, None, None)
358            .await
359            .expect("execute_alias_run");
360
361        match result {
362            AliasRunValue::Rows(rows) => {
363                assert_eq!(rows.len(), 1);
364                assert_eq!(rows[0].data["status"], "open");
365            }
366            other => panic!("expected Rows, got {other:?}"),
367        }
368    }
369
370    // -----------------------------------------------------------------------
371    // Test: Aggregator path — Count
372    // -----------------------------------------------------------------------
373    #[tokio::test]
374    async fn aggregator_path_count() {
375        let schema = status_schema();
376        let store = make_store_with_rows(
377            &schema,
378            vec![
379                serde_json::json!({"status": "open"}),
380                serde_json::json!({"status": "open"}),
381                serde_json::json!({"status": "closed"}),
382            ],
383        )
384        .await;
385        let registry = registry_from_store("items", store, schema);
386
387        // Use an "open" status filter to match 2 of 3 rows.
388        let record = AliasRecord {
389            name: "count_alias".into(),
390            sources: SourceSpec::Single("items".into()),
391            aggregator: Some(AliasAggregator::Count),
392            filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
393            default_limit: None,
394            description: None,
395            params_schema: None,
396            fields: None,
397            scope: None,
398        };
399
400        let result = execute_alias_run(&registry, record, None, None, None, None, None)
401            .await
402            .expect("execute_alias_run");
403
404        match result {
405            AliasRunValue::Aggregate(AliasRunResult::Count(n)) => assert_eq!(n, 2),
406            other => panic!("expected Aggregate(Count(2)), got {other:?}"),
407        }
408    }
409
410    // -----------------------------------------------------------------------
411    // Test: MiniJinja render — params_schema=Some, template substitution
412    // -----------------------------------------------------------------------
413    #[tokio::test]
414    async fn jinja_render_substitution() {
415        let schema = status_schema();
416        let store = make_store_with_rows(
417            &schema,
418            vec![
419                serde_json::json!({"status": "open"}),
420                serde_json::json!({"status": "closed"}),
421            ],
422        )
423        .await;
424        let registry = registry_from_store("items", store, schema);
425
426        // Template: substitute {{ status }} from params. Uses ListFilter JSON shape.
427        let record = AliasRecord {
428            name: "templated_alias".into(),
429            sources: SourceSpec::Single("items".into()),
430            aggregator: None,
431            filter: r#"{"type":"eq","field":"status","value":"{{ status }}"}"#.into(),
432            default_limit: None,
433            description: None,
434            params_schema: Some(r#"["status"]"#.into()),
435            fields: None,
436            scope: None,
437        };
438
439        let params = serde_json::json!({"status": "closed"});
440
441        let result = execute_alias_run(&registry, record, Some(params), None, None, None, None)
442            .await
443            .expect("execute_alias_run");
444
445        match result {
446            AliasRunValue::Rows(rows) => {
447                assert_eq!(rows.len(), 1);
448                assert_eq!(rows[0].data["status"], "closed");
449            }
450            other => panic!("expected Rows, got {other:?}"),
451        }
452    }
453
454    // -----------------------------------------------------------------------
455    // Test: MCP transport stringified params — Value::String("{...}") path
456    //
457    // Some MCP transports (notably the Claude Code stdio client) deliver
458    // `Option<serde_json::Value>` argument fields as JSON-encoded strings
459    // rather than parsed objects. Verify the defensive re-parse path so
460    // `{{ key }}` still resolves when params arrives as Value::String.
461    // -----------------------------------------------------------------------
462    #[tokio::test]
463    async fn jinja_render_with_stringified_params() {
464        let schema = status_schema();
465        let store = make_store_with_rows(
466            &schema,
467            vec![
468                serde_json::json!({"status": "open"}),
469                serde_json::json!({"status": "closed"}),
470            ],
471        )
472        .await;
473        let registry = registry_from_store("items", store, schema);
474
475        let record = AliasRecord {
476            name: "templated_alias".into(),
477            sources: SourceSpec::Single("items".into()),
478            aggregator: None,
479            filter: r#"{"type":"eq","field":"status","value":"{{ status }}"}"#.into(),
480            default_limit: None,
481            description: None,
482            params_schema: Some(r#"["status"]"#.into()),
483            fields: None,
484            scope: None,
485        };
486
487        // params arrives as Value::String containing a JSON-encoded object
488        // (the failure mode observed via the Claude Code MCP transport).
489        let stringified_params = serde_json::Value::String(r#"{"status": "closed"}"#.to_string());
490
491        let result = execute_alias_run(
492            &registry,
493            record,
494            Some(stringified_params),
495            None,
496            None,
497            None,
498            None,
499        )
500        .await
501        .expect("execute_alias_run must re-parse stringified params");
502
503        match result {
504            AliasRunValue::Rows(rows) => {
505                assert_eq!(rows.len(), 1);
506                assert_eq!(rows[0].data["status"], "closed");
507            }
508            other => panic!("expected Rows, got {other:?}"),
509        }
510    }
511
512    // -----------------------------------------------------------------------
513    // Test: Legacy mode fallback — sources Single("") + table_fallback
514    // -----------------------------------------------------------------------
515    #[tokio::test]
516    async fn legacy_mode_table_fallback() {
517        let schema = status_schema();
518        let store =
519            make_store_with_rows(&schema, vec![serde_json::json!({"status": "open"})]).await;
520        let registry = registry_from_store("items", store, schema);
521
522        // Simulate a legacy record where `sources` has an empty Single table
523        // name and `table_fallback` provides the real name.
524        let record = AliasRecord {
525            name: "legacy_alias".into(),
526            sources: SourceSpec::Single(String::new()),
527            aggregator: None,
528            filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
529            default_limit: None,
530            description: None,
531            params_schema: None,
532            fields: None,
533            scope: None,
534        };
535
536        let result = execute_alias_run(&registry, record, None, Some("items"), None, None, None)
537            .await
538            .expect("execute_alias_run");
539
540        match result {
541            AliasRunValue::Rows(rows) => assert!(!rows.is_empty()),
542            other => panic!("expected Rows, got {other:?}"),
543        }
544    }
545
546    // -----------------------------------------------------------------------
547    // Test: Multi/Pattern without aggregator → error
548    // -----------------------------------------------------------------------
549    #[tokio::test]
550    async fn multi_without_aggregator_is_error() {
551        let schema = status_schema();
552        let store = make_store_with_rows(&schema, vec![]).await;
553        let registry = registry_from_store("items", store, schema);
554
555        let record = AliasRecord {
556            name: "multi_alias".into(),
557            sources: SourceSpec::Multi(vec!["items".into(), "other".into()]),
558            aggregator: None,
559            filter: r#"{"type":"eq","field":"status","value":"open"}"#.into(),
560            default_limit: None,
561            description: None,
562            params_schema: None,
563            fields: None,
564            scope: None,
565        };
566
567        let err = execute_alias_run(&registry, record, None, None, None, None, None)
568            .await
569            .expect_err("should fail");
570
571        let msg = err.to_string();
572        assert!(
573            msg.contains("Multi/Pattern source aliases require an aggregator"),
574            "unexpected error: {msg}"
575        );
576    }
577}