Skip to main content

benday_core/
ingest.rs

1//! Data ingestion: resolve the spec's inline data and/or a piped data
2//! document into a normalized `Table`. Pure — the CLI does the I/O.
3//!
4//! Strictness boundary: the SPEC is agent-authored intent, so its data object
5//! is strict (`deny_unknown_fields`, over in `spec.rs`). The stdin document is
6//! producer-shaped payload (e.g. an MCP `structuredContent` envelope), so it
7//! is tolerant: known fields are used, unknown fields (query provenance etc.)
8//! are ignored.
9
10use std::collections::HashMap;
11
12use serde::{Deserialize, Serialize};
13use serde_json::{Map, Value};
14
15use crate::error::Error;
16use crate::spec::{Column, Data, FieldType, Spec};
17
18pub type Row = Map<String, Value>;
19
20/// A data document piped to stdin: a columnar envelope or a bare row array.
21/// Tolerant by design — no `deny_unknown_fields`.
22#[derive(Debug, Deserialize)]
23#[serde(untagged)]
24pub enum DataDoc {
25    Envelope {
26        columns: Vec<EnvColumn>,
27        rows: Vec<Vec<Value>>,
28        #[serde(default)]
29        truncated: Option<bool>,
30        #[serde(default)]
31        total_rows: Option<u64>,
32    },
33    Rows(Vec<Row>),
34}
35
36/// Envelope column: tolerant twin of `spec::Column` (producers may add keys).
37#[derive(Debug, Deserialize)]
38pub struct EnvColumn {
39    pub name: String,
40    #[serde(default, rename = "type")]
41    pub ty: Option<String>,
42}
43
44impl From<&Column> for EnvColumn {
45    fn from(c: &Column) -> Self {
46        EnvColumn {
47            name: c.name.clone(),
48            ty: c.ty.clone(),
49        }
50    }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
54#[serde(rename_all = "snake_case")]
55pub enum DataSource {
56    InlineValues,
57    InlineColumns,
58    StdinValues,
59    StdinColumns,
60}
61
62#[derive(Debug, Serialize)]
63pub struct DataProvenance {
64    pub source: DataSource,
65    pub truncated: Option<bool>,
66    pub total_rows: Option<u64>,
67}
68
69/// Normalized data ready for the compiler: row-major rows (as the compiler
70/// has always consumed), declared column types, and where it all came from.
71#[derive(Debug)]
72pub struct Table {
73    pub rows: Vec<Row>,
74    pub declared: HashMap<String, FieldType>,
75    pub provenance: DataProvenance,
76}
77
78/// Parse a stdin data document. Wraps serde's unhelpful untagged-enum error
79/// with the two accepted shapes.
80pub fn parse_data_doc(s: &str) -> Result<DataDoc, Error> {
81    serde_json::from_str(s).map_err(|e| {
82        Error::Data(format!(
83            "cannot parse stdin as a data document; expected \
84             {{\"columns\":[{{\"name\",\"type\"?}}...],\"rows\":[[...]...]}} or a JSON array \
85             of row objects: {e}"
86        ))
87    })
88}
89
90/// Map a declared column type (BigQuery + common SQL spellings, case-
91/// insensitive) to a field type. Unknown names fall back to nominal — NOT an
92/// error: producers grow types, and nominal is safe-wrong-in-the-obvious-way.
93/// DATE/TIMESTAMP map to ordinal BY DESIGN, not as an interim step: ISO
94/// strings sort lexically = chronologically, and SQL owns date bucketing and
95/// label formatting — benday has no temporal scale on the roadmap.
96pub fn declared_field_type(t: &str) -> FieldType {
97    match t.to_ascii_uppercase().as_str() {
98        "INT64" | "INTEGER" | "INT" | "SMALLINT" | "BIGINT" | "FLOAT64" | "FLOAT" | "DOUBLE"
99        | "NUMERIC" | "BIGNUMERIC" | "DECIMAL" | "REAL" => FieldType::Quantitative,
100        "DATE" | "DATETIME" | "TIMESTAMP" | "TIME" => FieldType::Ordinal,
101        _ => FieldType::Nominal,
102    }
103}
104
105/// Resolve spec + optional stdin document into a Table. Owns ALL precedence
106/// and data-shape errors so the corpus can pin them.
107pub fn resolve(spec: &Spec, stdin: Option<DataDoc>) -> Result<Table, Error> {
108    match (&spec.data, stdin) {
109        (Some(_), Some(_)) => Err(Error::Spec(
110            "data provided twice: the spec has inline `data` and a data document \
111             arrived on stdin; remove one"
112                .into(),
113        )),
114        (Some(data), None) => resolve_inline(data),
115        (None, Some(doc)) => resolve_stdin(doc),
116        (None, None) => Err(Error::Spec(
117            "no data: the spec has no `data` and nothing arrived on stdin; add \
118             data.values or data.columns+rows to the spec, or pipe a data document"
119                .into(),
120        )),
121    }
122}
123
124/// Resolve the spec's inline `data` object. Exactly one form is allowed:
125/// `values` (tidy row objects) or `columns` + `rows` (columnar). Any other
126/// combination is a spec error — serde can't express either/or without
127/// mangling the error paths, so it's checked here.
128fn resolve_inline(data: &Data) -> Result<Table, Error> {
129    match (&data.values, &data.columns, &data.rows) {
130        // Inline `values`: row-major already, no declared types.
131        (Some(values), None, None) => finish(
132            values.clone(),
133            HashMap::new(),
134            DataSource::InlineValues,
135            None,
136            None,
137        ),
138        (None, Some(columns), Some(rows)) => {
139            // Reuse the envelope's zip/validation. The spec's `Column` is the
140            // strict twin of `EnvColumn`; same fields, so convert and share.
141            let env: Vec<EnvColumn> = columns.iter().map(EnvColumn::from).collect();
142            let (rows, declared) = columnar_to_rows(&env, rows)?;
143            finish(rows, declared, DataSource::InlineColumns, None, None)
144        }
145        _ => Err(Error::Spec(
146            "data must contain either `values`, or `columns` and `rows`".into(),
147        )),
148    }
149}
150
151/// Resolve a stdin data document (envelope or bare rows) into a Table.
152fn resolve_stdin(doc: DataDoc) -> Result<Table, Error> {
153    match doc {
154        DataDoc::Envelope {
155            columns,
156            rows,
157            truncated,
158            total_rows,
159        } => {
160            let (rows, declared) = columnar_to_rows(&columns, &rows)?;
161            finish(
162                rows,
163                declared,
164                DataSource::StdinColumns,
165                truncated,
166                total_rows,
167            )
168        }
169        DataDoc::Rows(rows) => finish(rows, HashMap::new(), DataSource::StdinValues, None, None),
170    }
171}
172
173/// Zip a columnar envelope into row-major objects, keyed in column order, and
174/// collect declared column types. Shared shape for inline and stdin columnar.
175fn columnar_to_rows(
176    columns: &[EnvColumn],
177    rows: &[Vec<Value>],
178) -> Result<(Vec<Row>, HashMap<String, FieldType>), Error> {
179    let mut declared = HashMap::new();
180    let mut seen = std::collections::HashSet::new();
181    for col in columns {
182        if !seen.insert(col.name.as_str()) {
183            return Err(Error::Data(format!(
184                "duplicate column name \"{}\"",
185                col.name
186            )));
187        }
188        if let Some(ty) = &col.ty {
189            declared.insert(col.name.clone(), declared_field_type(ty));
190        }
191    }
192
193    let want = columns.len();
194    let mut out = Vec::with_capacity(rows.len());
195    for (i, row) in rows.iter().enumerate() {
196        if row.len() != want {
197            return Err(Error::Data(format!(
198                "row {i} has {} values but {want} columns are declared",
199                row.len()
200            )));
201        }
202        let mut obj = Row::new();
203        for (col, val) in columns.iter().zip(row) {
204            obj.insert(col.name.clone(), val.clone());
205        }
206        out.push(obj);
207    }
208    Ok((out, declared))
209}
210
211/// Apply the empty-rows rule (any form) and assemble the Table.
212fn finish(
213    rows: Vec<Row>,
214    declared: HashMap<String, FieldType>,
215    source: DataSource,
216    truncated: Option<bool>,
217    total_rows: Option<u64>,
218) -> Result<Table, Error> {
219    if rows.is_empty() {
220        return Err(Error::Data(
221            "data has no rows; provide at least one row".into(),
222        ));
223    }
224    Ok(Table {
225        rows,
226        declared,
227        provenance: DataProvenance {
228            source,
229            truncated,
230            total_rows,
231        },
232    })
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238    use serde_json::json;
239
240    fn spec_with_values() -> Spec {
241        serde_json::from_str(
242            r#"{"data":{"values":[{"x":1}]},"mark":"bar",
243               "encoding":{"x":{"field":"x"},"y":{"field":"y"}}}"#,
244        )
245        .expect("fixture spec parses")
246    }
247
248    fn spec_with_empty_values() -> Spec {
249        serde_json::from_str(
250            r#"{"data":{"values":[]},"mark":"bar",
251               "encoding":{"x":{"field":"x"},"y":{"field":"y"}}}"#,
252        )
253        .expect("fixture spec parses")
254    }
255
256    #[test]
257    fn parses_bare_row_array() {
258        let doc = parse_data_doc(r#"[{"a": 1}, {"a": 2}]"#).expect("bare array parses");
259        match doc {
260            DataDoc::Rows(rows) => assert_eq!(rows.len(), 2),
261            other => panic!("expected Rows, got {other:?}"),
262        }
263    }
264
265    #[test]
266    fn envelope_tolerates_unknown_keys() {
267        // The producer emits a `query` provenance block benday ignores.
268        let doc = parse_data_doc(
269            r#"{"columns":[{"name":"day","type":"STRING"},{"name":"n","type":"INT64"}],
270                "rows":[["mon",32]],
271                "query":{"job_id":"abc","note":"ignored"}}"#,
272        )
273        .expect("envelope with extra keys parses");
274        match doc {
275            DataDoc::Envelope { columns, rows, .. } => {
276                assert_eq!(columns.len(), 2);
277                assert_eq!(rows.len(), 1);
278            }
279            other => panic!("expected Envelope, got {other:?}"),
280        }
281    }
282
283    #[test]
284    fn envelope_truncated_and_total_rows_reach_provenance() {
285        let doc = parse_data_doc(
286            r#"{"columns":[{"name":"n"}],"rows":[[1]],"truncated":true,"total_rows":123}"#,
287        )
288        .expect("envelope parses");
289        let table = resolve_stdin(doc).expect("resolves");
290        assert_eq!(table.provenance.source, DataSource::StdinColumns);
291        assert_eq!(table.provenance.truncated, Some(true));
292        assert_eq!(table.provenance.total_rows, Some(123));
293    }
294
295    #[test]
296    fn bare_rows_provenance_has_no_envelope_fields() {
297        let doc = parse_data_doc(r#"[{"a":1}]"#).expect("parses");
298        let table = resolve_stdin(doc).expect("resolves");
299        assert_eq!(table.provenance.source, DataSource::StdinValues);
300        assert_eq!(table.provenance.truncated, None);
301        assert_eq!(table.provenance.total_rows, None);
302        assert!(table.declared.is_empty());
303    }
304
305    #[test]
306    fn columnar_zips_rows_in_column_order() {
307        let doc = parse_data_doc(
308            r#"{"columns":[{"name":"day","type":"STRING"},{"name":"n","type":"INT64"}],
309                "rows":[["mon",32],["tue",78]]}"#,
310        )
311        .expect("parses");
312        let table = resolve_stdin(doc).expect("resolves");
313        assert_eq!(table.rows.len(), 2);
314        assert_eq!(table.rows[0].get("day"), Some(&json!("mon")));
315        assert_eq!(table.rows[0].get("n"), Some(&json!(32)));
316        assert_eq!(table.rows[1].get("day"), Some(&json!("tue")));
317        assert_eq!(table.rows[1].get("n"), Some(&json!(78)));
318        // keys land in declared column order
319        let keys: Vec<&String> = table.rows[0].keys().collect();
320        assert_eq!(keys, vec!["day", "n"]);
321        assert_eq!(table.declared.get("day"), Some(&FieldType::Nominal));
322        assert_eq!(table.declared.get("n"), Some(&FieldType::Quantitative));
323    }
324
325    #[test]
326    fn duplicate_column_name_errors() {
327        let doc = parse_data_doc(r#"{"columns":[{"name":"a"},{"name":"a"}],"rows":[[1,2]]}"#)
328            .expect("parses");
329        let err = resolve_stdin(doc).expect_err("duplicate must error");
330        insta::assert_snapshot!(err.to_string(), @r###"duplicate column name "a""###);
331    }
332
333    #[test]
334    fn row_length_mismatch_errors() {
335        let doc =
336            parse_data_doc(r#"{"columns":[{"name":"a"},{"name":"b"}],"rows":[[1,2],[1,2,3]]}"#)
337                .expect("parses");
338        let err = resolve_stdin(doc).expect_err("length mismatch must error");
339        insta::assert_snapshot!(
340            err.to_string(),
341            @"row 1 has 3 values but 2 columns are declared"
342        );
343    }
344
345    #[test]
346    fn empty_rows_errors_columnar() {
347        let doc = parse_data_doc(r#"{"columns":[{"name":"a"}],"rows":[]}"#).expect("parses");
348        let err = resolve_stdin(doc).expect_err("empty rows must error");
349        insta::assert_snapshot!(err.to_string(), @"data has no rows; provide at least one row");
350    }
351
352    #[test]
353    fn empty_rows_errors_bare_array() {
354        let doc = parse_data_doc(r#"[]"#).expect("parses");
355        let err = resolve_stdin(doc).expect_err("empty rows must error");
356        insta::assert_snapshot!(err.to_string(), @"data has no rows; provide at least one row");
357    }
358
359    #[test]
360    fn empty_rows_errors_inline_values() {
361        let err = resolve(&spec_with_empty_values(), None).expect_err("empty inline must error");
362        insta::assert_snapshot!(err.to_string(), @"data has no rows; provide at least one row");
363    }
364
365    #[test]
366    fn inline_values_resolve_to_inline_provenance() {
367        let table = resolve(&spec_with_values(), None).expect("resolves");
368        assert_eq!(table.provenance.source, DataSource::InlineValues);
369        assert_eq!(table.provenance.truncated, None);
370        assert_eq!(table.provenance.total_rows, None);
371        assert!(table.declared.is_empty());
372        assert_eq!(table.rows.len(), 1);
373    }
374
375    #[test]
376    fn data_provided_twice_errors() {
377        let doc = parse_data_doc(r#"[{"a":1}]"#).expect("parses");
378        let err = resolve(&spec_with_values(), Some(doc)).expect_err("data twice must error");
379        insta::assert_snapshot!(
380            err.to_string(),
381            @"data provided twice: the spec has inline `data` and a data document arrived on stdin; remove one"
382        );
383    }
384
385    #[test]
386    fn inline_columnar_resolves_with_declared_types() {
387        let spec: Spec = serde_json::from_str(
388            r#"{"data":{"columns":[{"name":"day","type":"STRING"},{"name":"n","type":"INT64"}],
389                       "rows":[["mon",32],["tue",78]]},
390                "mark":"bar","encoding":{"x":{"field":"day"},"y":{"field":"n"}}}"#,
391        )
392        .expect("inline columnar spec parses");
393        let table = resolve(&spec, None).expect("resolves");
394        assert_eq!(table.provenance.source, DataSource::InlineColumns);
395        assert_eq!(table.rows.len(), 2);
396        assert_eq!(table.rows[0].get("day"), Some(&json!("mon")));
397        assert_eq!(table.rows[0].get("n"), Some(&json!(32)));
398        assert_eq!(table.declared.get("day"), Some(&FieldType::Nominal));
399        assert_eq!(table.declared.get("n"), Some(&FieldType::Quantitative));
400    }
401
402    #[test]
403    fn inline_data_both_forms_errors() {
404        let spec: Spec = serde_json::from_str(
405            r#"{"data":{"values":[{"a":1}],"columns":[{"name":"a"}],"rows":[[1]]},
406                "mark":"bar","encoding":{"x":{"field":"a"},"y":{"field":"a"}}}"#,
407        )
408        .expect("spec parses");
409        let err = resolve(&spec, None).expect_err("both forms must error");
410        insta::assert_snapshot!(
411            err.to_string(),
412            @"data must contain either `values`, or `columns` and `rows`"
413        );
414    }
415
416    #[test]
417    fn no_data_anywhere_errors() {
418        let spec: Spec = serde_json::from_str(
419            r#"{"mark":"bar","encoding":{"x":{"field":"a"},"y":{"field":"b"}}}"#,
420        )
421        .expect("spec without data parses");
422        let err = resolve(&spec, None).expect_err("no data must error");
423        insta::assert_snapshot!(
424            err.to_string(),
425            @"no data: the spec has no `data` and nothing arrived on stdin; add data.values or data.columns+rows to the spec, or pipe a data document"
426        );
427    }
428
429    #[test]
430    fn declared_field_type_mapping() {
431        // Quantitative spellings.
432        for t in [
433            "INT64",
434            "INTEGER",
435            "INT",
436            "SMALLINT",
437            "BIGINT",
438            "FLOAT64",
439            "FLOAT",
440            "DOUBLE",
441            "NUMERIC",
442            "BIGNUMERIC",
443            "DECIMAL",
444            "REAL",
445        ] {
446            assert_eq!(declared_field_type(t), FieldType::Quantitative, "{t}");
447        }
448        // Date/time spellings map to ordinal this cycle.
449        for t in ["DATE", "DATETIME", "TIMESTAMP", "TIME"] {
450            assert_eq!(declared_field_type(t), FieldType::Ordinal, "{t}");
451        }
452        // Strings and unknowns fall back to nominal.
453        assert_eq!(declared_field_type("STRING"), FieldType::Nominal);
454        assert_eq!(declared_field_type("BOOL"), FieldType::Nominal);
455        assert_eq!(declared_field_type("whatever"), FieldType::Nominal);
456        // Case-insensitive.
457        assert_eq!(declared_field_type("int64"), FieldType::Quantitative);
458        assert_eq!(declared_field_type("Date"), FieldType::Ordinal);
459        assert_eq!(declared_field_type("String"), FieldType::Nominal);
460    }
461}