Skip to main content

polyglot_sql/
openlineage.rs

1//! OpenLineage-compatible payload generation.
2//!
3//! This module only builds OpenLineage JSON-compatible structures from SQL
4//! analysis. It deliberately does not implement transports, clients, retries,
5//! buffering, or runtime lifecycle management.
6
7use crate::dialects::{Dialect, DialectType};
8use crate::expressions::*;
9use crate::lineage::{self, LineageNode};
10use crate::schema::Schema;
11use crate::scope::SourceKind;
12use crate::traversal::ExpressionWalk;
13use crate::{mapping_schema_from_validation_schema, Error, Result, ValidationSchema};
14use serde::de::{self, Deserializer};
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use std::collections::{BTreeMap, BTreeSet, HashSet};
18
19pub const OPENLINEAGE_SCHEMA_URL: &str = "https://openlineage.io/spec/2-0-2/OpenLineage.json";
20pub const COLUMN_LINEAGE_FACET_SCHEMA_URL: &str =
21    "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json";
22pub const SQL_JOB_FACET_SCHEMA_URL: &str =
23    "https://openlineage.io/spec/facets/1-1-0/SQLJobFacet.json";
24pub const JOB_TYPE_JOB_FACET_SCHEMA_URL: &str =
25    "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json";
26pub const SCHEMA_DATASET_FACET_SCHEMA_URL: &str =
27    "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json";
28
29/// Dataset identity in OpenLineage (`namespace`, `name`).
30#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub struct OpenLineageDatasetId {
33    pub namespace: String,
34    pub name: String,
35}
36
37impl OpenLineageDatasetId {
38    pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
39        Self {
40            namespace: namespace.into(),
41            name: name.into(),
42        }
43    }
44}
45
46/// Options shared by OpenLineage payload generation helpers.
47#[derive(Debug, Clone, Serialize, Deserialize, Default)]
48#[serde(rename_all = "camelCase", default)]
49pub struct OpenLineageOptions {
50    #[serde(deserialize_with = "deserialize_dialect_type")]
51    pub dialect: DialectType,
52    pub producer: String,
53    pub dataset_namespace: Option<String>,
54    pub dataset_mappings: BTreeMap<String, OpenLineageDatasetId>,
55    pub output_dataset: Option<OpenLineageDatasetId>,
56    pub schema: Option<ValidationSchema>,
57    pub job_namespace: Option<String>,
58    pub job_name: Option<String>,
59    pub event_time: Option<String>,
60    pub run_id: Option<String>,
61    pub event_type: Option<OpenLineageRunEventType>,
62}
63
64/// OpenLineage run event type.
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
67pub enum OpenLineageRunEventType {
68    Start,
69    Running,
70    Complete,
71    Abort,
72    Fail,
73    Other,
74}
75
76/// Non-fatal issue encountered while generating OpenLineage output.
77#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
78#[serde(rename_all = "camelCase")]
79pub struct OpenLineageWarning {
80    pub code: String,
81    pub message: String,
82}
83
84impl OpenLineageWarning {
85    fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
86        Self {
87            code: code.into(),
88            message: message.into(),
89        }
90    }
91}
92
93#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "camelCase")]
95pub struct OpenLineageColumnLineageResult {
96    pub facet: ColumnLineageDatasetFacet,
97    pub inputs: Vec<OpenLineageDataset>,
98    pub outputs: Vec<OpenLineageDataset>,
99    pub warnings: Vec<OpenLineageWarning>,
100}
101
102#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104pub struct OpenLineageEventResult {
105    pub event: Value,
106    pub warnings: Vec<OpenLineageWarning>,
107}
108
109#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
110pub struct OpenLineageDataset {
111    pub namespace: String,
112    pub name: String,
113    #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
114    pub facets: BTreeMap<String, Value>,
115}
116
117impl std::convert::From<OpenLineageDatasetId> for OpenLineageDataset {
118    fn from(id: OpenLineageDatasetId) -> Self {
119        Self {
120            namespace: id.namespace,
121            name: id.name,
122            facets: BTreeMap::new(),
123        }
124    }
125}
126
127#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
128pub struct ColumnLineageDatasetFacet {
129    #[serde(rename = "_producer")]
130    pub producer: String,
131    #[serde(rename = "_schemaURL")]
132    pub schema_url: String,
133    pub fields: BTreeMap<String, ColumnLineageField>,
134}
135
136#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
137#[serde(rename_all = "camelCase")]
138pub struct ColumnLineageField {
139    pub input_fields: Vec<OpenLineageInputField>,
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
143#[serde(rename_all = "camelCase")]
144pub struct OpenLineageInputField {
145    pub namespace: String,
146    pub name: String,
147    pub field: String,
148    #[serde(skip_serializing_if = "Vec::is_empty", default)]
149    pub transformations: Vec<OpenLineageTransformation>,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
153pub struct OpenLineageTransformation {
154    #[serde(rename = "type")]
155    pub type_: String,
156    pub subtype: String,
157    #[serde(skip_serializing_if = "Option::is_none")]
158    pub description: Option<String>,
159    #[serde(skip_serializing_if = "Option::is_none")]
160    pub masking: Option<bool>,
161}
162
163#[derive(Debug, Clone)]
164struct StatementAnalysis {
165    query: Expression,
166    inputs: Vec<OpenLineageDatasetId>,
167    output: OpenLineageDatasetId,
168    output_column_names: Vec<String>,
169}
170
171#[derive(Debug, Clone)]
172struct OutputField {
173    name: String,
174    lineage_name: String,
175    expression: Option<Expression>,
176    star_source_table: Option<String>,
177}
178
179#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
180struct TerminalField {
181    table: String,
182    field: String,
183}
184
185/// Produce a standalone OpenLineage columnLineage facet plus inferred datasets.
186pub fn openlineage_column_lineage(
187    sql: &str,
188    options: &OpenLineageOptions,
189) -> Result<OpenLineageColumnLineageResult> {
190    validate_common_options(options)?;
191
192    let mut warnings = Vec::new();
193    let schema_mapping = options
194        .schema
195        .as_ref()
196        .map(mapping_schema_from_validation_schema);
197    let dialect = Dialect::get(options.dialect);
198    let mut expressions = dialect.parse(sql)?;
199    if expressions.len() != 1 {
200        return Err(Error::parse(
201            format!(
202                "OpenLineage generation expects exactly one statement, found {}",
203                expressions.len()
204            ),
205            0,
206            0,
207            0,
208            0,
209        ));
210    }
211
212    let expr = expressions.remove(0);
213    let analysis = analyze_statement(&expr, options, &mut warnings)?;
214    let mut output_fields = output_fields_for_query(
215        &analysis.query,
216        schema_mapping.as_ref().map(|s| s as &dyn Schema),
217        options.dialect,
218        &mut warnings,
219    )?;
220    apply_output_column_names(
221        &mut output_fields,
222        &analysis.output_column_names,
223        &mut warnings,
224    );
225
226    let mut fields = BTreeMap::new();
227    for output_field in output_fields {
228        if fields.contains_key(&output_field.name) {
229            warnings.push(OpenLineageWarning::new(
230                "W_DUPLICATE_OUTPUT_FIELD",
231                format!(
232                    "Duplicate output field '{}' was merged in the OpenLineage fields map",
233                    output_field.name
234                ),
235            ));
236        }
237
238        let input_fields = input_fields_for_output(
239            &analysis.query,
240            &output_field,
241            options,
242            schema_mapping.as_ref().map(|s| s as &dyn Schema),
243            &mut warnings,
244        )?;
245
246        fields.insert(output_field.name, ColumnLineageField { input_fields });
247    }
248
249    let mut outputs = vec![OpenLineageDataset::from(analysis.output.clone())];
250    attach_output_facets(&mut outputs[0], &analysis.output, options, &fields)?;
251
252    Ok(OpenLineageColumnLineageResult {
253        facet: ColumnLineageDatasetFacet {
254            producer: options.producer.clone(),
255            schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
256            fields,
257        },
258        inputs: analysis
259            .inputs
260            .into_iter()
261            .map(OpenLineageDataset::from)
262            .collect(),
263        outputs,
264        warnings,
265    })
266}
267
268/// Produce an OpenLineage JobEvent as JSON.
269pub fn openlineage_job_event(
270    sql: &str,
271    options: &OpenLineageOptions,
272) -> Result<OpenLineageEventResult> {
273    let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
274    let job_name = required_option(&options.job_name, "jobName")?;
275    let event_time = required_option(&options.event_time, "eventTime")?;
276
277    let result = openlineage_column_lineage(sql, options)?;
278    let event = json!({
279        "eventTime": event_time,
280        "producer": options.producer,
281        "schemaURL": OPENLINEAGE_SCHEMA_URL,
282        "job": {
283            "namespace": job_namespace,
284            "name": job_name,
285            "facets": job_facets(sql, options),
286        },
287        "inputs": result.inputs,
288        "outputs": result.outputs,
289    });
290
291    Ok(OpenLineageEventResult {
292        event,
293        warnings: result.warnings,
294    })
295}
296
297/// Produce an OpenLineage RunEvent as JSON.
298pub fn openlineage_run_event(
299    sql: &str,
300    options: &OpenLineageOptions,
301) -> Result<OpenLineageEventResult> {
302    let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
303    let job_name = required_option(&options.job_name, "jobName")?;
304    let event_time = required_option(&options.event_time, "eventTime")?;
305    let run_id = required_option(&options.run_id, "runId")?;
306    let event_type = options
307        .event_type
308        .ok_or_else(|| Error::parse("Missing required option: eventType", 0, 0, 0, 0))?;
309
310    let result = openlineage_column_lineage(sql, options)?;
311    let event = json!({
312        "eventTime": event_time,
313        "eventType": event_type,
314        "producer": options.producer,
315        "schemaURL": OPENLINEAGE_SCHEMA_URL,
316        "run": {
317            "runId": run_id,
318            "facets": {},
319        },
320        "job": {
321            "namespace": job_namespace,
322            "name": job_name,
323            "facets": job_facets(sql, options),
324        },
325        "inputs": result.inputs,
326        "outputs": result.outputs,
327    });
328
329    Ok(OpenLineageEventResult {
330        event,
331        warnings: result.warnings,
332    })
333}
334
335fn validate_common_options(options: &OpenLineageOptions) -> Result<()> {
336    if options.producer.trim().is_empty() {
337        return Err(Error::parse(
338            "Missing required option: producer",
339            0,
340            0,
341            0,
342            0,
343        ));
344    }
345    Ok(())
346}
347
348fn required_option(value: &Option<String>, name: &str) -> Result<String> {
349    match value.as_ref().filter(|v| !v.trim().is_empty()) {
350        Some(value) => Ok(value.clone()),
351        None => Err(Error::parse(
352            format!("Missing required option: {name}"),
353            0,
354            0,
355            0,
356            0,
357        )),
358    }
359}
360
361fn analyze_statement(
362    expr: &Expression,
363    options: &OpenLineageOptions,
364    warnings: &mut Vec<OpenLineageWarning>,
365) -> Result<StatementAnalysis> {
366    match expr {
367        Expression::Prepare(prepare) => analyze_statement(&prepare.statement, options, warnings),
368        Expression::Select(select) => {
369            let output = if let Some(into) = &select.into {
370                dataset_from_expression(&into.this, options)?
371            } else {
372                options.output_dataset.clone().ok_or_else(|| {
373                    Error::parse(
374                        "OpenLineage outputDataset is required for SELECT statements without SELECT INTO",
375                        0,
376                        0,
377                        0,
378                        0,
379                    )
380                })?
381            };
382            Ok(StatementAnalysis {
383                query: expr.clone(),
384                inputs: collect_input_datasets(expr, options, Some(&output), warnings)?,
385                output,
386                output_column_names: Vec::new(),
387            })
388        }
389        Expression::Insert(insert) => {
390            let output = dataset_from_table_ref(&insert.table, options)?;
391            let query = insert.query.clone().ok_or_else(|| {
392                Error::unsupported(
393                    "OpenLineage column lineage for INSERT without query",
394                    options.dialect.to_string(),
395                )
396            })?;
397            Ok(StatementAnalysis {
398                inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
399                query,
400                output,
401                output_column_names: insert.columns.iter().map(|col| col.name.clone()).collect(),
402            })
403        }
404        Expression::CreateTable(create) => {
405            let output = dataset_from_table_ref(&create.name, options)?;
406            let query = create.as_select.clone().ok_or_else(|| {
407                Error::unsupported(
408                    "OpenLineage column lineage for CREATE TABLE without AS SELECT",
409                    options.dialect.to_string(),
410                )
411            })?;
412            Ok(StatementAnalysis {
413                inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
414                query,
415                output,
416                output_column_names: create
417                    .columns
418                    .iter()
419                    .map(|col| col.name.name.clone())
420                    .collect(),
421            })
422        }
423        _ => Err(Error::unsupported(
424            format!("OpenLineage generation for {}", expr.variant_name()),
425            options.dialect.to_string(),
426        )),
427    }
428}
429
430fn output_fields_for_query(
431    query: &Expression,
432    schema: Option<&dyn Schema>,
433    dialect: DialectType,
434    warnings: &mut Vec<OpenLineageWarning>,
435) -> Result<Vec<OutputField>> {
436    let select = leftmost_select(query).ok_or_else(|| {
437        Error::unsupported(
438            "OpenLineage output field extraction for non-SELECT query",
439            dialect.to_string(),
440        )
441    })?;
442
443    let mut fields = Vec::new();
444    for (idx, expr) in select.expressions.iter().enumerate() {
445        if is_star_expr(expr) {
446            expand_star_output_fields(select, expr, schema, warnings, &mut fields);
447            continue;
448        }
449
450        let name = output_name(expr).unwrap_or_else(|| format!("_{idx}"));
451        fields.push(OutputField {
452            lineage_name: name.clone(),
453            name,
454            expression: Some(expr.clone()),
455            star_source_table: None,
456        });
457    }
458    Ok(fields)
459}
460
461fn apply_output_column_names(
462    fields: &mut [OutputField],
463    output_column_names: &[String],
464    warnings: &mut Vec<OpenLineageWarning>,
465) {
466    if output_column_names.is_empty() {
467        return;
468    }
469    if output_column_names.len() != fields.len() {
470        warnings.push(OpenLineageWarning::new(
471            "W_OUTPUT_COLUMN_COUNT_MISMATCH",
472            format!(
473                "Target column count ({}) does not match projected column count ({})",
474                output_column_names.len(),
475                fields.len()
476            ),
477        ));
478        return;
479    }
480    for (field, output_name) in fields.iter_mut().zip(output_column_names) {
481        field.name = output_name.clone();
482    }
483}
484
485fn input_fields_for_output(
486    query: &Expression,
487    output_field: &OutputField,
488    options: &OpenLineageOptions,
489    schema: Option<&dyn Schema>,
490    warnings: &mut Vec<OpenLineageWarning>,
491) -> Result<Vec<OpenLineageInputField>> {
492    if let Some(table) = &output_field.star_source_table {
493        return terminal_fields_to_openlineage(
494            vec![TerminalField {
495                table: table.clone(),
496                field: output_field.lineage_name.clone(),
497            }],
498            "IDENTITY",
499            Some(format!("SELECT {}", output_field.lineage_name)),
500            options,
501            warnings,
502        );
503    }
504
505    let lineage_result = if let Some(schema) = schema {
506        lineage::lineage_with_schema(
507            &output_field.lineage_name,
508            query,
509            Some(schema),
510            Some(options.dialect),
511            false,
512        )
513    } else {
514        lineage::lineage(
515            &output_field.lineage_name,
516            query,
517            Some(options.dialect),
518            false,
519        )
520    };
521
522    let node = match lineage_result {
523        Ok(node) => node,
524        Err(err) => {
525            warnings.push(OpenLineageWarning::new(
526                "W_UNRESOLVED_OUTPUT_FIELD",
527                format!(
528                    "Could not resolve lineage for output field '{}': {}",
529                    output_field.name, err
530                ),
531            ));
532            return Ok(Vec::new());
533        }
534    };
535
536    let mut terminals = BTreeSet::new();
537    collect_terminal_fields(&node, &mut terminals);
538    let terminals: Vec<TerminalField> = terminals.into_iter().collect();
539
540    if terminals.is_empty() {
541        if has_virtual_terminal(&node) {
542            return Ok(Vec::new());
543        }
544        warnings.push(OpenLineageWarning::new(
545            "W_EMPTY_FIELD_LINEAGE",
546            format!(
547                "No input fields were found for output field '{}'",
548                output_field.name
549            ),
550        ));
551        return Ok(Vec::new());
552    }
553
554    let subtype = transformation_subtype(output_field.expression.as_ref(), &terminals);
555    let description = output_field
556        .expression
557        .as_ref()
558        .and_then(|expr| transformation_description(expr, options.dialect));
559
560    terminal_fields_to_openlineage(terminals, subtype, description, options, warnings)
561}
562
563fn transformation_description(expr: &Expression, dialect: DialectType) -> Option<String> {
564    #[cfg(feature = "generate")]
565    {
566        Some(expr.sql_for(dialect))
567    }
568
569    #[cfg(not(feature = "generate"))]
570    {
571        let _ = (expr, dialect);
572        None
573    }
574}
575
576fn terminal_fields_to_openlineage(
577    terminals: Vec<TerminalField>,
578    subtype: &str,
579    description: Option<String>,
580    options: &OpenLineageOptions,
581    warnings: &mut Vec<OpenLineageWarning>,
582) -> Result<Vec<OpenLineageInputField>> {
583    let mut result = Vec::new();
584    for terminal in terminals {
585        let dataset = dataset_from_table_name(&terminal.table, options).map_err(|err| {
586            warnings.push(OpenLineageWarning::new(
587                "W_UNRESOLVED_DATASET",
588                format!(
589                    "Could not map table '{}' to an OpenLineage dataset: {}",
590                    terminal.table, err
591                ),
592            ));
593            err
594        })?;
595        result.push(OpenLineageInputField {
596            namespace: dataset.namespace,
597            name: dataset.name,
598            field: terminal.field,
599            transformations: vec![OpenLineageTransformation {
600                type_: "DIRECT".to_string(),
601                subtype: subtype.to_string(),
602                description: description.clone(),
603                masking: Some(false),
604            }],
605        });
606    }
607    Ok(result)
608}
609
610fn transformation_subtype(expr: Option<&Expression>, terminals: &[TerminalField]) -> &'static str {
611    let Some(expr) = expr else {
612        return "TRANSFORMATION";
613    };
614    let unaliased = unalias(expr);
615    if expression_contains_aggregate(unaliased) {
616        return "AGGREGATION";
617    }
618    if terminals.len() == 1 {
619        if let Expression::Column(col) = unaliased {
620            if col.name.name == terminals[0].field {
621                return "IDENTITY";
622            }
623        }
624    }
625    "TRANSFORMATION"
626}
627
628fn collect_terminal_fields(node: &LineageNode, terminals: &mut BTreeSet<TerminalField>) {
629    if node.downstream.is_empty() {
630        if node.source_kind == SourceKind::Virtual {
631            return;
632        }
633        if let Expression::Column(column) = &node.expression {
634            let table = if !node.source_name.is_empty() {
635                Some(node.source_name.clone())
636            } else if let Expression::Table(table) = &node.source {
637                Some(table_ref_qualified_name(table))
638            } else {
639                column.table.as_ref().map(|t| t.name.clone())
640            };
641            if let Some(table) = table.filter(|t| !t.is_empty()) {
642                terminals.insert(TerminalField {
643                    table,
644                    field: column.name.name.clone(),
645                });
646            }
647        }
648        return;
649    }
650
651    for child in &node.downstream {
652        collect_terminal_fields(child, terminals);
653    }
654}
655
656fn has_virtual_terminal(node: &LineageNode) -> bool {
657    if node.downstream.is_empty() {
658        return node.source_kind == SourceKind::Virtual;
659    }
660    node.downstream.iter().any(has_virtual_terminal)
661}
662
663fn expression_contains_aggregate(expr: &Expression) -> bool {
664    expr.contains(|node| {
665        matches!(
666            node,
667            Expression::AggregateFunction(_)
668                | Expression::Sum(_)
669                | Expression::Count(_)
670                | Expression::Avg(_)
671                | Expression::Min(_)
672                | Expression::Max(_)
673                | Expression::GroupConcat(_)
674                | Expression::StringAgg(_)
675                | Expression::ListAgg(_)
676                | Expression::ArrayAgg(_)
677                | Expression::CountIf(_)
678                | Expression::SumIf(_)
679                | Expression::Stddev(_)
680                | Expression::StddevPop(_)
681                | Expression::StddevSamp(_)
682                | Expression::Variance(_)
683                | Expression::VarPop(_)
684                | Expression::VarSamp(_)
685                | Expression::Median(_)
686                | Expression::Mode(_)
687                | Expression::First(_)
688                | Expression::Last(_)
689                | Expression::AnyValue(_)
690                | Expression::ApproxDistinct(_)
691                | Expression::ApproxCountDistinct(_)
692                | Expression::ApproxPercentile(_)
693                | Expression::Percentile(_)
694                | Expression::LogicalAnd(_)
695                | Expression::LogicalOr(_)
696                | Expression::Skewness(_)
697                | Expression::BitwiseCount(_)
698                | Expression::ArrayConcatAgg(_)
699                | Expression::ArrayUniqueAgg(_)
700                | Expression::BoolXorAgg(_)
701                | Expression::ParameterizedAgg(_)
702                | Expression::ArgMax(_)
703                | Expression::ArgMin(_)
704                | Expression::ApproxTopK(_)
705                | Expression::ApproxTopKAccumulate(_)
706                | Expression::ApproxTopKCombine(_)
707                | Expression::ApproxTopKEstimate(_)
708                | Expression::ApproxTopSum(_)
709                | Expression::ApproxQuantiles(_)
710                | Expression::Grouping(_)
711                | Expression::GroupingId(_)
712                | Expression::AnonymousAggFunc(_)
713                | Expression::CombinedAggFunc(_)
714                | Expression::CombinedParameterizedAgg(_)
715                | Expression::HashAgg(_)
716                | Expression::ObjectAgg(_)
717                | Expression::AIAgg(_)
718        )
719    })
720}
721
722fn collect_input_datasets(
723    expr: &Expression,
724    options: &OpenLineageOptions,
725    output: Option<&OpenLineageDatasetId>,
726    warnings: &mut Vec<OpenLineageWarning>,
727) -> Result<Vec<OpenLineageDatasetId>> {
728    let cte_aliases = collect_cte_aliases(expr, options.dialect);
729    let mut seen = BTreeSet::new();
730    let mut result = Vec::new();
731
732    for table in expr.dfs().filter_map(|node| match node {
733        Expression::Table(table) => Some(table),
734        _ => None,
735    }) {
736        let qname = table_ref_qualified_name(table);
737        let normalized = normalize_identifier(&table.name.name, options.dialect, true);
738        if cte_aliases.contains(&normalized) {
739            continue;
740        }
741        if output
742            .map(|out| out.name == qname || out.name == table.name.name)
743            .unwrap_or(false)
744        {
745            continue;
746        }
747        match dataset_from_table_name(&qname, options) {
748            Ok(dataset) => {
749                if seen.insert((dataset.namespace.clone(), dataset.name.clone())) {
750                    result.push(dataset);
751                }
752            }
753            Err(err) => warnings.push(OpenLineageWarning::new(
754                "W_UNRESOLVED_DATASET",
755                format!("Could not map input table '{qname}': {err}"),
756            )),
757        }
758    }
759
760    Ok(result)
761}
762
763fn attach_output_facets(
764    output: &mut OpenLineageDataset,
765    output_id: &OpenLineageDatasetId,
766    options: &OpenLineageOptions,
767    fields: &BTreeMap<String, ColumnLineageField>,
768) -> Result<()> {
769    let column_lineage = ColumnLineageDatasetFacet {
770        producer: options.producer.clone(),
771        schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
772        fields: fields.clone(),
773    };
774    output.facets.insert(
775        "columnLineage".to_string(),
776        serde_json::to_value(column_lineage).map_err(openlineage_serialization_error)?,
777    );
778
779    if let Some(schema_facet) = schema_facet_for_dataset(output_id, options) {
780        output.facets.insert(
781            "schema".to_string(),
782            serde_json::to_value(schema_facet).map_err(openlineage_serialization_error)?,
783        );
784    }
785
786    Ok(())
787}
788
789fn job_facets(sql: &str, options: &OpenLineageOptions) -> Value {
790    json!({
791        "sql": {
792            "_producer": options.producer,
793            "_schemaURL": SQL_JOB_FACET_SCHEMA_URL,
794            "query": sql,
795            "dialect": options.dialect.to_string(),
796        },
797        "jobType": {
798            "_producer": options.producer,
799            "_schemaURL": JOB_TYPE_JOB_FACET_SCHEMA_URL,
800            "processingType": "BATCH",
801            "integration": "POLYGLOT_SQL",
802            "jobType": "QUERY",
803        }
804    })
805}
806
807#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
808struct SchemaDatasetFacet {
809    #[serde(rename = "_producer")]
810    producer: String,
811    #[serde(rename = "_schemaURL")]
812    schema_url: String,
813    fields: Vec<SchemaDatasetFacetField>,
814}
815
816#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
817struct SchemaDatasetFacetField {
818    name: String,
819    #[serde(skip_serializing_if = "String::is_empty", default)]
820    #[serde(rename = "type")]
821    data_type: String,
822    #[serde(skip_serializing_if = "Option::is_none")]
823    ordinal_position: Option<usize>,
824}
825
826fn schema_facet_for_dataset(
827    output: &OpenLineageDatasetId,
828    options: &OpenLineageOptions,
829) -> Option<SchemaDatasetFacet> {
830    let schema = options.schema.as_ref()?;
831    let table = schema.tables.iter().find(|table| {
832        let qname = if let Some(schema_name) = &table.schema {
833            format!("{}.{}", schema_name, table.name)
834        } else {
835            table.name.clone()
836        };
837        output.name == table.name || output.name == qname
838    })?;
839
840    Some(SchemaDatasetFacet {
841        producer: options.producer.clone(),
842        schema_url: SCHEMA_DATASET_FACET_SCHEMA_URL.to_string(),
843        fields: table
844            .columns
845            .iter()
846            .enumerate()
847            .map(|(idx, col)| SchemaDatasetFacetField {
848                name: col.name.clone(),
849                data_type: col.data_type.clone(),
850                ordinal_position: Some(idx + 1),
851            })
852            .collect(),
853    })
854}
855
856fn expand_star_output_fields(
857    select: &Select,
858    star_expr: &Expression,
859    schema: Option<&dyn Schema>,
860    warnings: &mut Vec<OpenLineageWarning>,
861    fields: &mut Vec<OutputField>,
862) {
863    let Some(schema) = schema else {
864        warnings.push(OpenLineageWarning::new(
865            "W_STAR_WITHOUT_SCHEMA",
866            "SELECT * cannot be expanded into OpenLineage column lineage without schema metadata",
867        ));
868        return;
869    };
870
871    let qualifier = star_qualifier(star_expr);
872    let sources = select_source_tables(select);
873    for (alias, qname) in sources {
874        if qualifier
875            .as_ref()
876            .map(|q| q != &alias && q != &qname)
877            .unwrap_or(false)
878        {
879            continue;
880        }
881        match schema.column_names(&qname) {
882            Ok(columns) => {
883                for name in columns {
884                    fields.push(OutputField {
885                        lineage_name: name.clone(),
886                        name,
887                        expression: None,
888                        star_source_table: Some(qname.clone()),
889                    });
890                }
891            }
892            Err(err) => warnings.push(OpenLineageWarning::new(
893                "W_STAR_SCHEMA_LOOKUP_FAILED",
894                format!("Could not expand SELECT * for table '{}': {}", qname, err),
895            )),
896        }
897    }
898}
899
900fn select_source_tables(select: &Select) -> Vec<(String, String)> {
901    let mut result = Vec::new();
902    if let Some(from) = &select.from {
903        for expr in &from.expressions {
904            collect_source_table(expr, &mut result);
905        }
906    }
907    for join in &select.joins {
908        collect_source_table(&join.this, &mut result);
909    }
910    result
911}
912
913fn collect_source_table(expr: &Expression, result: &mut Vec<(String, String)>) {
914    match expr {
915        Expression::Table(table) => {
916            let qname = table_ref_qualified_name(table);
917            let alias = table
918                .alias
919                .as_ref()
920                .map(|a| a.name.clone())
921                .unwrap_or_else(|| table.name.name.clone());
922            result.push((alias, qname));
923        }
924        Expression::Alias(alias) => collect_source_table(&alias.this, result),
925        Expression::Paren(paren) => collect_source_table(&paren.this, result),
926        _ => {}
927    }
928}
929
930fn leftmost_select(expr: &Expression) -> Option<&Select> {
931    match expr {
932        Expression::Prepare(prepare) => leftmost_select(&prepare.statement),
933        Expression::Select(select) => Some(select),
934        Expression::Union(union) => leftmost_select(&union.left),
935        Expression::Intersect(intersect) => leftmost_select(&intersect.left),
936        Expression::Except(except) => leftmost_select(&except.left),
937        Expression::Subquery(subquery) => leftmost_select(&subquery.this),
938        _ => None,
939    }
940}
941
942fn output_name(expr: &Expression) -> Option<String> {
943    match expr {
944        Expression::Alias(alias) => Some(alias.alias.name.clone()),
945        Expression::Column(col) => Some(col.name.name.clone()),
946        Expression::Identifier(id) => Some(id.name.clone()),
947        Expression::Annotated(a) => output_name(&a.this),
948        _ => None,
949    }
950}
951
952fn unalias(expr: &Expression) -> &Expression {
953    match expr {
954        Expression::Alias(alias) => &alias.this,
955        Expression::Annotated(a) => unalias(&a.this),
956        _ => expr,
957    }
958}
959
960fn is_star_expr(expr: &Expression) -> bool {
961    matches!(expr, Expression::Star(_))
962        || matches!(expr, Expression::Column(col) if col.name.name == "*")
963}
964
965fn star_qualifier(expr: &Expression) -> Option<String> {
966    match expr {
967        Expression::Star(star) => star.table.as_ref().map(|t| t.name.clone()),
968        Expression::Column(col) if col.name.name == "*" => {
969            col.table.as_ref().map(|t| t.name.clone())
970        }
971        _ => None,
972    }
973}
974
975fn dataset_from_expression(
976    expr: &Expression,
977    options: &OpenLineageOptions,
978) -> Result<OpenLineageDatasetId> {
979    match expr {
980        Expression::Table(table) => dataset_from_table_ref(table, options),
981        Expression::Identifier(id) => dataset_from_table_name(&id.name, options),
982        _ => Err(Error::unsupported(
983            "OpenLineage dataset extraction from non-table expression",
984            options.dialect.to_string(),
985        )),
986    }
987}
988
989fn dataset_from_table_ref(
990    table: &TableRef,
991    options: &OpenLineageOptions,
992) -> Result<OpenLineageDatasetId> {
993    dataset_from_table_name(&table_ref_qualified_name(table), options)
994}
995
996fn dataset_from_table_name(
997    table_name: &str,
998    options: &OpenLineageOptions,
999) -> Result<OpenLineageDatasetId> {
1000    if let Some(mapped) = options.dataset_mappings.get(table_name) {
1001        return Ok(mapped.clone());
1002    }
1003    let namespace = options.dataset_namespace.as_ref().ok_or_else(|| {
1004        Error::parse(
1005            format!(
1006                "Missing datasetNamespace or explicit dataset mapping for table '{}'",
1007                table_name
1008            ),
1009            0,
1010            0,
1011            0,
1012            0,
1013        )
1014    })?;
1015    Ok(OpenLineageDatasetId::new(namespace, table_name))
1016}
1017
1018fn table_ref_qualified_name(table: &TableRef) -> String {
1019    let mut parts = Vec::new();
1020    if let Some(catalog) = &table.catalog {
1021        parts.push(catalog.name.clone());
1022    }
1023    if let Some(schema) = &table.schema {
1024        parts.push(schema.name.clone());
1025    }
1026    parts.push(table.name.name.clone());
1027    parts.join(".")
1028}
1029
1030fn collect_cte_aliases(expr: &Expression, dialect: DialectType) -> HashSet<String> {
1031    let mut aliases = HashSet::new();
1032    for node in expr.dfs() {
1033        match node {
1034            Expression::Select(select) => {
1035                if let Some(with) = &select.with {
1036                    collect_with_aliases(with, dialect, &mut aliases);
1037                }
1038            }
1039            Expression::Union(union) => {
1040                if let Some(with) = &union.with {
1041                    collect_with_aliases(with, dialect, &mut aliases);
1042                }
1043            }
1044            Expression::Intersect(intersect) => {
1045                if let Some(with) = &intersect.with {
1046                    collect_with_aliases(with, dialect, &mut aliases);
1047                }
1048            }
1049            Expression::Except(except) => {
1050                if let Some(with) = &except.with {
1051                    collect_with_aliases(with, dialect, &mut aliases);
1052                }
1053            }
1054            _ => {}
1055        }
1056    }
1057    aliases
1058}
1059
1060fn collect_with_aliases(with: &With, dialect: DialectType, aliases: &mut HashSet<String>) {
1061    for cte in &with.ctes {
1062        aliases.insert(normalize_identifier(&cte.alias.name, dialect, true));
1063    }
1064}
1065
1066fn normalize_identifier(name: &str, dialect: DialectType, is_table: bool) -> String {
1067    crate::schema::normalize_name(name, Some(dialect), is_table, true)
1068}
1069
1070fn openlineage_serialization_error(err: serde_json::Error) -> Error {
1071    Error::internal(format!("OpenLineage serialization failed: {err}"))
1072}
1073
1074fn deserialize_dialect_type<'de, D>(deserializer: D) -> std::result::Result<DialectType, D::Error>
1075where
1076    D: Deserializer<'de>,
1077{
1078    let value = String::deserialize(deserializer)?;
1079    value.parse::<DialectType>().map_err(de::Error::custom)
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084    use super::*;
1085
1086    fn options() -> OpenLineageOptions {
1087        OpenLineageOptions {
1088            dialect: DialectType::PostgreSQL,
1089            producer: "https://github.com/tobilg/polyglot".to_string(),
1090            dataset_namespace: Some("postgres://warehouse".to_string()),
1091            output_dataset: Some(OpenLineageDatasetId::new(
1092                "postgres://warehouse",
1093                "analytics.out",
1094            )),
1095            job_namespace: Some("polyglot-tests".to_string()),
1096            job_name: Some("lineage-test".to_string()),
1097            event_time: Some("2026-05-18T00:00:00Z".to_string()),
1098            run_id: Some("3b452093-782c-4ef2-9c0c-aafe2aa6f34d".to_string()),
1099            event_type: Some(OpenLineageRunEventType::Complete),
1100            ..Default::default()
1101        }
1102    }
1103
1104    #[test]
1105    fn deserializes_dialect_aliases_in_options() {
1106        let options: OpenLineageOptions =
1107            serde_json::from_str(r#"{"producer":"polyglot","dialect":"postgres"}"#)
1108                .expect("options");
1109        assert_eq!(options.dialect, DialectType::PostgreSQL);
1110    }
1111
1112    #[test]
1113    fn emits_identity_column_lineage_for_select() {
1114        let result = openlineage_column_lineage("SELECT a FROM t", &options()).expect("lineage");
1115        let field = result.facet.fields.get("a").expect("field a");
1116        assert_eq!(field.input_fields.len(), 1);
1117        assert_eq!(field.input_fields[0].name, "t");
1118        assert_eq!(field.input_fields[0].field, "a");
1119        assert_eq!(field.input_fields[0].transformations[0].subtype, "IDENTITY");
1120    }
1121
1122    #[test]
1123    fn emits_column_lineage_for_prepared_statement_body() {
1124        let result = openlineage_column_lineage(
1125            "PREPARE leak AS SELECT id FROM sensitive_table WHERE id = $1",
1126            &options(),
1127        )
1128        .expect("lineage");
1129        let field = result.facet.fields.get("id").expect("field id");
1130        assert_eq!(field.input_fields.len(), 1);
1131        assert_eq!(field.input_fields[0].name, "sensitive_table");
1132        assert_eq!(field.input_fields[0].field, "id");
1133    }
1134
1135    #[test]
1136    fn resolves_input_dataset_behind_table_alias() {
1137        let result = openlineage_column_lineage("SELECT o.total FROM orders o", &options())
1138            .expect("lineage");
1139        let field = result.facet.fields.get("total").expect("field total");
1140        assert_eq!(field.input_fields[0].name, "orders");
1141        assert_eq!(field.input_fields[0].field, "total");
1142    }
1143
1144    #[test]
1145    fn emits_transformation_column_lineage_for_expression() {
1146        let result =
1147            openlineage_column_lineage("SELECT a + b AS c FROM t", &options()).expect("lineage");
1148        let field = result.facet.fields.get("c").expect("field c");
1149        assert_eq!(field.input_fields.len(), 2);
1150        assert!(field.input_fields.iter().any(|f| f.field == "a"));
1151        assert!(field.input_fields.iter().any(|f| f.field == "b"));
1152        assert!(field
1153            .input_fields
1154            .iter()
1155            .all(|f| f.transformations[0].subtype == "TRANSFORMATION"));
1156    }
1157
1158    #[test]
1159    fn omits_bigquery_safe_namespace_from_column_lineage_issue207() {
1160        let mut opts = options();
1161        opts.dialect = DialectType::BigQuery;
1162
1163        let result = openlineage_column_lineage(
1164            r#"
1165WITH import_cte AS (
1166  SELECT timestamp, data, operation
1167  FROM `project`.`dataset`.`source_table`
1168),
1169transform_cte AS (
1170  SELECT
1171    timestamp,
1172    SAFE.PARSE_JSON(data) AS json_data
1173  FROM import_cte
1174)
1175SELECT json_data FROM transform_cte
1176"#,
1177            &opts,
1178        )
1179        .expect("lineage");
1180        let field = result.facet.fields.get("json_data").expect("json_data");
1181
1182        assert!(
1183            field.input_fields.iter().any(|input| input.field == "data"),
1184            "expected data input field, got {:?}",
1185            field.input_fields
1186        );
1187        assert!(
1188            !field
1189                .input_fields
1190                .iter()
1191                .any(|input| input.field.eq_ignore_ascii_case("safe")),
1192            "did not expect SAFE namespace as input field, got {:?}",
1193            field.input_fields
1194        );
1195    }
1196
1197    #[test]
1198    fn emits_bigquery_unnest_alias_column_lineage_issue209() {
1199        let mut opts = options();
1200        opts.dialect = DialectType::BigQuery;
1201        opts.dataset_namespace = Some("bigquery://warehouse".to_string());
1202        opts.output_dataset = Some(OpenLineageDatasetId::new(
1203            "bigquery://warehouse",
1204            "calendar",
1205        ));
1206
1207        let result = openlineage_column_lineage(
1208            r#"
1209SELECT date_val AS week_start
1210FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
1211"#,
1212            &opts,
1213        )
1214        .expect("lineage");
1215        let field = result.facet.fields.get("week_start").expect("week_start");
1216
1217        assert!(field.input_fields.is_empty());
1218        assert!(
1219            result
1220                .warnings
1221                .iter()
1222                .all(|warning| warning.code != "W_EMPTY_FIELD_LINEAGE"),
1223            "did not expect empty-lineage warning, got {:?}",
1224            result.warnings
1225        );
1226    }
1227
1228    #[test]
1229    fn emits_bigquery_table_backed_unnest_column_lineage() {
1230        let mut opts = options();
1231        opts.dialect = DialectType::BigQuery;
1232        opts.dataset_namespace = Some("bigquery://warehouse".to_string());
1233        opts.output_dataset = Some(OpenLineageDatasetId::new("bigquery://warehouse", "items"));
1234
1235        let result = openlineage_column_lineage(
1236            r#"
1237SELECT item.item AS item
1238FROM t JOIN UNNEST(t.items) AS item ON TRUE
1239"#,
1240            &opts,
1241        )
1242        .expect("lineage");
1243        let field = result.facet.fields.get("item").expect("item");
1244
1245        assert_eq!(field.input_fields.len(), 1);
1246        assert_eq!(field.input_fields[0].name, "t");
1247        assert_eq!(field.input_fields[0].field, "items");
1248    }
1249
1250    #[test]
1251    fn emits_aggregation_column_lineage() {
1252        let result =
1253            openlineage_column_lineage("SELECT SUM(amount) AS total FROM orders", &options())
1254                .expect("lineage");
1255        let field = result.facet.fields.get("total").expect("field total");
1256        assert_eq!(field.input_fields[0].field, "amount");
1257        assert_eq!(
1258            field.input_fields[0].transformations[0].subtype,
1259            "AGGREGATION"
1260        );
1261    }
1262
1263    #[test]
1264    fn infers_insert_output_dataset() {
1265        let mut opts = options();
1266        opts.output_dataset = None;
1267        let result =
1268            openlineage_column_lineage("INSERT INTO analytics.out SELECT a FROM raw.input", &opts)
1269                .expect("lineage");
1270        assert_eq!(result.outputs[0].name, "analytics.out");
1271        assert_eq!(result.inputs[0].name, "raw.input");
1272    }
1273
1274    #[test]
1275    fn maps_insert_target_columns_to_output_fields() {
1276        let mut opts = options();
1277        opts.output_dataset = None;
1278        let result = openlineage_column_lineage(
1279            "INSERT INTO analytics.out (target_a) SELECT source_a FROM raw.input",
1280            &opts,
1281        )
1282        .expect("lineage");
1283        let field = result.facet.fields.get("target_a").expect("target field");
1284        assert_eq!(field.input_fields[0].field, "source_a");
1285        assert!(!result.facet.fields.contains_key("source_a"));
1286    }
1287
1288    #[test]
1289    fn pure_select_requires_output_dataset() {
1290        let mut opts = options();
1291        opts.output_dataset = None;
1292        let err = openlineage_column_lineage("SELECT a FROM t", &opts).unwrap_err();
1293        assert!(err.to_string().contains("outputDataset is required"));
1294    }
1295
1296    #[test]
1297    fn emits_job_event_payload() {
1298        let result = openlineage_job_event("SELECT a FROM t", &options()).expect("event");
1299        assert_eq!(result.event["job"]["namespace"], "polyglot-tests");
1300        assert_eq!(
1301            result.event["job"]["facets"]["sql"]["_schemaURL"],
1302            SQL_JOB_FACET_SCHEMA_URL
1303        );
1304        assert_eq!(
1305            result.event["outputs"][0]["facets"]["columnLineage"]["fields"]["a"]["inputFields"][0]
1306                ["field"],
1307            "a"
1308        );
1309    }
1310
1311    #[test]
1312    fn emits_run_event_payload() {
1313        let result = openlineage_run_event("SELECT a FROM t", &options()).expect("event");
1314        assert_eq!(result.event["eventType"], "COMPLETE");
1315        assert_eq!(
1316            result.event["run"]["runId"],
1317            "3b452093-782c-4ef2-9c0c-aafe2aa6f34d"
1318        );
1319    }
1320
1321    #[test]
1322    fn select_star_without_schema_warns() {
1323        let result = openlineage_column_lineage("SELECT * FROM t", &options()).expect("lineage");
1324        assert!(result.facet.fields.is_empty());
1325        assert!(result
1326            .warnings
1327            .iter()
1328            .any(|w| w.code == "W_STAR_WITHOUT_SCHEMA"));
1329    }
1330
1331    #[test]
1332    fn select_star_with_schema_expands_fields() {
1333        let mut opts = options();
1334        opts.schema = Some(ValidationSchema {
1335            strict: None,
1336            tables: vec![crate::validation::SchemaTable {
1337                name: "t".to_string(),
1338                schema: None,
1339                columns: vec![
1340                    crate::validation::SchemaColumn {
1341                        name: "a".to_string(),
1342                        data_type: "INT".to_string(),
1343                        nullable: None,
1344                        primary_key: false,
1345                        unique: false,
1346                        references: None,
1347                    },
1348                    crate::validation::SchemaColumn {
1349                        name: "b".to_string(),
1350                        data_type: "TEXT".to_string(),
1351                        nullable: None,
1352                        primary_key: false,
1353                        unique: false,
1354                        references: None,
1355                    },
1356                ],
1357                aliases: vec![],
1358                primary_key: vec![],
1359                unique_keys: vec![],
1360                foreign_keys: vec![],
1361            }],
1362        });
1363
1364        let result = openlineage_column_lineage("SELECT * FROM t", &opts).expect("lineage");
1365        assert!(result.facet.fields.contains_key("a"));
1366        assert!(result.facet.fields.contains_key("b"));
1367    }
1368}