Skip to main content

polyglot_sql/
openlineage.rs

1//! OpenLineage-compatible payload generation.
2//!
3//! This module only builds OpenLineage JSON-compatible structures from SQL
4//! analysis. It deliberately does not implement transports, clients, retries,
5//! buffering, or runtime lifecycle management.
6
7use crate::dialects::{Dialect, DialectType};
8use crate::expressions::*;
9use crate::lineage::{self, LineageNode};
10use crate::schema::Schema;
11use crate::traversal::ExpressionWalk;
12use crate::{mapping_schema_from_validation_schema, Error, Result, ValidationSchema};
13use serde::de::{self, Deserializer};
14use serde::{Deserialize, Serialize};
15use serde_json::{json, Value};
16use std::collections::{BTreeMap, BTreeSet, HashSet};
17
18pub const OPENLINEAGE_SCHEMA_URL: &str = "https://openlineage.io/spec/2-0-2/OpenLineage.json";
19pub const COLUMN_LINEAGE_FACET_SCHEMA_URL: &str =
20    "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json";
21pub const SQL_JOB_FACET_SCHEMA_URL: &str =
22    "https://openlineage.io/spec/facets/1-1-0/SQLJobFacet.json";
23pub const JOB_TYPE_JOB_FACET_SCHEMA_URL: &str =
24    "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json";
25pub const SCHEMA_DATASET_FACET_SCHEMA_URL: &str =
26    "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json";
27
28/// Dataset identity in OpenLineage (`namespace`, `name`).
29#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct OpenLineageDatasetId {
32    pub namespace: String,
33    pub name: String,
34}
35
36impl OpenLineageDatasetId {
37    pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
38        Self {
39            namespace: namespace.into(),
40            name: name.into(),
41        }
42    }
43}
44
45/// Options shared by OpenLineage payload generation helpers.
46#[derive(Debug, Clone, Serialize, Deserialize, Default)]
47#[serde(rename_all = "camelCase", default)]
48pub struct OpenLineageOptions {
49    #[serde(deserialize_with = "deserialize_dialect_type")]
50    pub dialect: DialectType,
51    pub producer: String,
52    pub dataset_namespace: Option<String>,
53    pub dataset_mappings: BTreeMap<String, OpenLineageDatasetId>,
54    pub output_dataset: Option<OpenLineageDatasetId>,
55    pub schema: Option<ValidationSchema>,
56    pub job_namespace: Option<String>,
57    pub job_name: Option<String>,
58    pub event_time: Option<String>,
59    pub run_id: Option<String>,
60    pub event_type: Option<OpenLineageRunEventType>,
61}
62
63/// OpenLineage run event type.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
66pub enum OpenLineageRunEventType {
67    Start,
68    Running,
69    Complete,
70    Abort,
71    Fail,
72    Other,
73}
74
75/// Non-fatal issue encountered while generating OpenLineage output.
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "camelCase")]
78pub struct OpenLineageWarning {
79    pub code: String,
80    pub message: String,
81}
82
83impl OpenLineageWarning {
84    fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
85        Self {
86            code: code.into(),
87            message: message.into(),
88        }
89    }
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93#[serde(rename_all = "camelCase")]
94pub struct OpenLineageColumnLineageResult {
95    pub facet: ColumnLineageDatasetFacet,
96    pub inputs: Vec<OpenLineageDataset>,
97    pub outputs: Vec<OpenLineageDataset>,
98    pub warnings: Vec<OpenLineageWarning>,
99}
100
101#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
102#[serde(rename_all = "camelCase")]
103pub struct OpenLineageEventResult {
104    pub event: Value,
105    pub warnings: Vec<OpenLineageWarning>,
106}
107
108#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
109pub struct OpenLineageDataset {
110    pub namespace: String,
111    pub name: String,
112    #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
113    pub facets: BTreeMap<String, Value>,
114}
115
116impl std::convert::From<OpenLineageDatasetId> for OpenLineageDataset {
117    fn from(id: OpenLineageDatasetId) -> Self {
118        Self {
119            namespace: id.namespace,
120            name: id.name,
121            facets: BTreeMap::new(),
122        }
123    }
124}
125
126#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127pub struct ColumnLineageDatasetFacet {
128    #[serde(rename = "_producer")]
129    pub producer: String,
130    #[serde(rename = "_schemaURL")]
131    pub schema_url: String,
132    pub fields: BTreeMap<String, ColumnLineageField>,
133}
134
135#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct ColumnLineageField {
138    pub input_fields: Vec<OpenLineageInputField>,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
142#[serde(rename_all = "camelCase")]
143pub struct OpenLineageInputField {
144    pub namespace: String,
145    pub name: String,
146    pub field: String,
147    #[serde(skip_serializing_if = "Vec::is_empty", default)]
148    pub transformations: Vec<OpenLineageTransformation>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
152pub struct OpenLineageTransformation {
153    #[serde(rename = "type")]
154    pub type_: String,
155    pub subtype: String,
156    #[serde(skip_serializing_if = "Option::is_none")]
157    pub description: Option<String>,
158    #[serde(skip_serializing_if = "Option::is_none")]
159    pub masking: Option<bool>,
160}
161
162#[derive(Debug, Clone)]
163struct StatementAnalysis {
164    query: Expression,
165    inputs: Vec<OpenLineageDatasetId>,
166    output: OpenLineageDatasetId,
167    output_column_names: Vec<String>,
168}
169
170#[derive(Debug, Clone)]
171struct OutputField {
172    name: String,
173    lineage_name: String,
174    expression: Option<Expression>,
175    star_source_table: Option<String>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
179struct TerminalField {
180    table: String,
181    field: String,
182}
183
184/// Produce a standalone OpenLineage columnLineage facet plus inferred datasets.
185pub fn openlineage_column_lineage(
186    sql: &str,
187    options: &OpenLineageOptions,
188) -> Result<OpenLineageColumnLineageResult> {
189    validate_common_options(options)?;
190
191    let mut warnings = Vec::new();
192    let schema_mapping = options
193        .schema
194        .as_ref()
195        .map(mapping_schema_from_validation_schema);
196    let dialect = Dialect::get(options.dialect);
197    let mut expressions = dialect.parse(sql)?;
198    if expressions.len() != 1 {
199        return Err(Error::parse(
200            format!(
201                "OpenLineage generation expects exactly one statement, found {}",
202                expressions.len()
203            ),
204            0,
205            0,
206            0,
207            0,
208        ));
209    }
210
211    let expr = expressions.remove(0);
212    let analysis = analyze_statement(&expr, options, &mut warnings)?;
213    let mut output_fields = output_fields_for_query(
214        &analysis.query,
215        schema_mapping.as_ref().map(|s| s as &dyn Schema),
216        options.dialect,
217        &mut warnings,
218    )?;
219    apply_output_column_names(
220        &mut output_fields,
221        &analysis.output_column_names,
222        &mut warnings,
223    );
224
225    let mut fields = BTreeMap::new();
226    for output_field in output_fields {
227        if fields.contains_key(&output_field.name) {
228            warnings.push(OpenLineageWarning::new(
229                "W_DUPLICATE_OUTPUT_FIELD",
230                format!(
231                    "Duplicate output field '{}' was merged in the OpenLineage fields map",
232                    output_field.name
233                ),
234            ));
235        }
236
237        let input_fields = input_fields_for_output(
238            &analysis.query,
239            &output_field,
240            options,
241            schema_mapping.as_ref().map(|s| s as &dyn Schema),
242            &mut warnings,
243        )?;
244
245        fields.insert(output_field.name, ColumnLineageField { input_fields });
246    }
247
248    let mut outputs = vec![OpenLineageDataset::from(analysis.output.clone())];
249    attach_output_facets(&mut outputs[0], &analysis.output, options, &fields)?;
250
251    Ok(OpenLineageColumnLineageResult {
252        facet: ColumnLineageDatasetFacet {
253            producer: options.producer.clone(),
254            schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
255            fields,
256        },
257        inputs: analysis
258            .inputs
259            .into_iter()
260            .map(OpenLineageDataset::from)
261            .collect(),
262        outputs,
263        warnings,
264    })
265}
266
267/// Produce an OpenLineage JobEvent as JSON.
268pub fn openlineage_job_event(
269    sql: &str,
270    options: &OpenLineageOptions,
271) -> Result<OpenLineageEventResult> {
272    let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
273    let job_name = required_option(&options.job_name, "jobName")?;
274    let event_time = required_option(&options.event_time, "eventTime")?;
275
276    let result = openlineage_column_lineage(sql, options)?;
277    let event = json!({
278        "eventTime": event_time,
279        "producer": options.producer,
280        "schemaURL": OPENLINEAGE_SCHEMA_URL,
281        "job": {
282            "namespace": job_namespace,
283            "name": job_name,
284            "facets": job_facets(sql, options),
285        },
286        "inputs": result.inputs,
287        "outputs": result.outputs,
288    });
289
290    Ok(OpenLineageEventResult {
291        event,
292        warnings: result.warnings,
293    })
294}
295
296/// Produce an OpenLineage RunEvent as JSON.
297pub fn openlineage_run_event(
298    sql: &str,
299    options: &OpenLineageOptions,
300) -> Result<OpenLineageEventResult> {
301    let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
302    let job_name = required_option(&options.job_name, "jobName")?;
303    let event_time = required_option(&options.event_time, "eventTime")?;
304    let run_id = required_option(&options.run_id, "runId")?;
305    let event_type = options
306        .event_type
307        .ok_or_else(|| Error::parse("Missing required option: eventType", 0, 0, 0, 0))?;
308
309    let result = openlineage_column_lineage(sql, options)?;
310    let event = json!({
311        "eventTime": event_time,
312        "eventType": event_type,
313        "producer": options.producer,
314        "schemaURL": OPENLINEAGE_SCHEMA_URL,
315        "run": {
316            "runId": run_id,
317            "facets": {},
318        },
319        "job": {
320            "namespace": job_namespace,
321            "name": job_name,
322            "facets": job_facets(sql, options),
323        },
324        "inputs": result.inputs,
325        "outputs": result.outputs,
326    });
327
328    Ok(OpenLineageEventResult {
329        event,
330        warnings: result.warnings,
331    })
332}
333
334fn validate_common_options(options: &OpenLineageOptions) -> Result<()> {
335    if options.producer.trim().is_empty() {
336        return Err(Error::parse(
337            "Missing required option: producer",
338            0,
339            0,
340            0,
341            0,
342        ));
343    }
344    Ok(())
345}
346
347fn required_option(value: &Option<String>, name: &str) -> Result<String> {
348    match value.as_ref().filter(|v| !v.trim().is_empty()) {
349        Some(value) => Ok(value.clone()),
350        None => Err(Error::parse(
351            format!("Missing required option: {name}"),
352            0,
353            0,
354            0,
355            0,
356        )),
357    }
358}
359
360fn analyze_statement(
361    expr: &Expression,
362    options: &OpenLineageOptions,
363    warnings: &mut Vec<OpenLineageWarning>,
364) -> Result<StatementAnalysis> {
365    match expr {
366        Expression::Prepare(prepare) => analyze_statement(&prepare.statement, options, warnings),
367        Expression::Select(select) => {
368            let output = if let Some(into) = &select.into {
369                dataset_from_expression(&into.this, options)?
370            } else {
371                options.output_dataset.clone().ok_or_else(|| {
372                    Error::parse(
373                        "OpenLineage outputDataset is required for SELECT statements without SELECT INTO",
374                        0,
375                        0,
376                        0,
377                        0,
378                    )
379                })?
380            };
381            Ok(StatementAnalysis {
382                query: expr.clone(),
383                inputs: collect_input_datasets(expr, options, Some(&output), warnings)?,
384                output,
385                output_column_names: Vec::new(),
386            })
387        }
388        Expression::Insert(insert) => {
389            let output = dataset_from_table_ref(&insert.table, options)?;
390            let query = insert.query.clone().ok_or_else(|| {
391                Error::unsupported(
392                    "OpenLineage column lineage for INSERT without query",
393                    options.dialect.to_string(),
394                )
395            })?;
396            Ok(StatementAnalysis {
397                inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
398                query,
399                output,
400                output_column_names: insert.columns.iter().map(|col| col.name.clone()).collect(),
401            })
402        }
403        Expression::CreateTable(create) => {
404            let output = dataset_from_table_ref(&create.name, options)?;
405            let query = create.as_select.clone().ok_or_else(|| {
406                Error::unsupported(
407                    "OpenLineage column lineage for CREATE TABLE without AS SELECT",
408                    options.dialect.to_string(),
409                )
410            })?;
411            Ok(StatementAnalysis {
412                inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
413                query,
414                output,
415                output_column_names: create
416                    .columns
417                    .iter()
418                    .map(|col| col.name.name.clone())
419                    .collect(),
420            })
421        }
422        _ => Err(Error::unsupported(
423            format!("OpenLineage generation for {}", expr.variant_name()),
424            options.dialect.to_string(),
425        )),
426    }
427}
428
429fn output_fields_for_query(
430    query: &Expression,
431    schema: Option<&dyn Schema>,
432    dialect: DialectType,
433    warnings: &mut Vec<OpenLineageWarning>,
434) -> Result<Vec<OutputField>> {
435    let select = leftmost_select(query).ok_or_else(|| {
436        Error::unsupported(
437            "OpenLineage output field extraction for non-SELECT query",
438            dialect.to_string(),
439        )
440    })?;
441
442    let mut fields = Vec::new();
443    for (idx, expr) in select.expressions.iter().enumerate() {
444        if is_star_expr(expr) {
445            expand_star_output_fields(select, expr, schema, warnings, &mut fields);
446            continue;
447        }
448
449        let name = output_name(expr).unwrap_or_else(|| format!("_{idx}"));
450        fields.push(OutputField {
451            lineage_name: name.clone(),
452            name,
453            expression: Some(expr.clone()),
454            star_source_table: None,
455        });
456    }
457    Ok(fields)
458}
459
460fn apply_output_column_names(
461    fields: &mut [OutputField],
462    output_column_names: &[String],
463    warnings: &mut Vec<OpenLineageWarning>,
464) {
465    if output_column_names.is_empty() {
466        return;
467    }
468    if output_column_names.len() != fields.len() {
469        warnings.push(OpenLineageWarning::new(
470            "W_OUTPUT_COLUMN_COUNT_MISMATCH",
471            format!(
472                "Target column count ({}) does not match projected column count ({})",
473                output_column_names.len(),
474                fields.len()
475            ),
476        ));
477        return;
478    }
479    for (field, output_name) in fields.iter_mut().zip(output_column_names) {
480        field.name = output_name.clone();
481    }
482}
483
484fn input_fields_for_output(
485    query: &Expression,
486    output_field: &OutputField,
487    options: &OpenLineageOptions,
488    schema: Option<&dyn Schema>,
489    warnings: &mut Vec<OpenLineageWarning>,
490) -> Result<Vec<OpenLineageInputField>> {
491    if let Some(table) = &output_field.star_source_table {
492        return terminal_fields_to_openlineage(
493            vec![TerminalField {
494                table: table.clone(),
495                field: output_field.lineage_name.clone(),
496            }],
497            "IDENTITY",
498            Some(format!("SELECT {}", output_field.lineage_name)),
499            options,
500            warnings,
501        );
502    }
503
504    let lineage_result = if let Some(schema) = schema {
505        lineage::lineage_with_schema(
506            &output_field.lineage_name,
507            query,
508            Some(schema),
509            Some(options.dialect),
510            false,
511        )
512    } else {
513        lineage::lineage(
514            &output_field.lineage_name,
515            query,
516            Some(options.dialect),
517            false,
518        )
519    };
520
521    let node = match lineage_result {
522        Ok(node) => node,
523        Err(err) => {
524            warnings.push(OpenLineageWarning::new(
525                "W_UNRESOLVED_OUTPUT_FIELD",
526                format!(
527                    "Could not resolve lineage for output field '{}': {}",
528                    output_field.name, err
529                ),
530            ));
531            return Ok(Vec::new());
532        }
533    };
534
535    let mut terminals = BTreeSet::new();
536    collect_terminal_fields(&node, &mut terminals);
537    let terminals: Vec<TerminalField> = terminals.into_iter().collect();
538
539    if terminals.is_empty() {
540        warnings.push(OpenLineageWarning::new(
541            "W_EMPTY_FIELD_LINEAGE",
542            format!(
543                "No input fields were found for output field '{}'",
544                output_field.name
545            ),
546        ));
547        return Ok(Vec::new());
548    }
549
550    let subtype = transformation_subtype(output_field.expression.as_ref(), &terminals);
551    let description = output_field
552        .expression
553        .as_ref()
554        .and_then(|expr| transformation_description(expr, options.dialect));
555
556    terminal_fields_to_openlineage(terminals, subtype, description, options, warnings)
557}
558
559fn transformation_description(expr: &Expression, dialect: DialectType) -> Option<String> {
560    #[cfg(feature = "generate")]
561    {
562        Some(expr.sql_for(dialect))
563    }
564
565    #[cfg(not(feature = "generate"))]
566    {
567        let _ = (expr, dialect);
568        None
569    }
570}
571
572fn terminal_fields_to_openlineage(
573    terminals: Vec<TerminalField>,
574    subtype: &str,
575    description: Option<String>,
576    options: &OpenLineageOptions,
577    warnings: &mut Vec<OpenLineageWarning>,
578) -> Result<Vec<OpenLineageInputField>> {
579    let mut result = Vec::new();
580    for terminal in terminals {
581        let dataset = dataset_from_table_name(&terminal.table, options).map_err(|err| {
582            warnings.push(OpenLineageWarning::new(
583                "W_UNRESOLVED_DATASET",
584                format!(
585                    "Could not map table '{}' to an OpenLineage dataset: {}",
586                    terminal.table, err
587                ),
588            ));
589            err
590        })?;
591        result.push(OpenLineageInputField {
592            namespace: dataset.namespace,
593            name: dataset.name,
594            field: terminal.field,
595            transformations: vec![OpenLineageTransformation {
596                type_: "DIRECT".to_string(),
597                subtype: subtype.to_string(),
598                description: description.clone(),
599                masking: Some(false),
600            }],
601        });
602    }
603    Ok(result)
604}
605
606fn transformation_subtype(expr: Option<&Expression>, terminals: &[TerminalField]) -> &'static str {
607    let Some(expr) = expr else {
608        return "TRANSFORMATION";
609    };
610    let unaliased = unalias(expr);
611    if expression_contains_aggregate(unaliased) {
612        return "AGGREGATION";
613    }
614    if terminals.len() == 1 {
615        if let Expression::Column(col) = unaliased {
616            if col.name.name == terminals[0].field {
617                return "IDENTITY";
618            }
619        }
620    }
621    "TRANSFORMATION"
622}
623
624fn collect_terminal_fields(node: &LineageNode, terminals: &mut BTreeSet<TerminalField>) {
625    if node.downstream.is_empty() {
626        if let Expression::Column(column) = &node.expression {
627            let table = if !node.source_name.is_empty() {
628                Some(node.source_name.clone())
629            } else if let Expression::Table(table) = &node.source {
630                Some(table_ref_qualified_name(table))
631            } else {
632                column.table.as_ref().map(|t| t.name.clone())
633            };
634            if let Some(table) = table.filter(|t| !t.is_empty()) {
635                terminals.insert(TerminalField {
636                    table,
637                    field: column.name.name.clone(),
638                });
639            }
640        }
641        return;
642    }
643
644    for child in &node.downstream {
645        collect_terminal_fields(child, terminals);
646    }
647}
648
649fn expression_contains_aggregate(expr: &Expression) -> bool {
650    expr.contains(|node| {
651        matches!(
652            node,
653            Expression::AggregateFunction(_)
654                | Expression::Sum(_)
655                | Expression::Count(_)
656                | Expression::Avg(_)
657                | Expression::Min(_)
658                | Expression::Max(_)
659                | Expression::GroupConcat(_)
660                | Expression::StringAgg(_)
661                | Expression::ListAgg(_)
662                | Expression::ArrayAgg(_)
663                | Expression::CountIf(_)
664                | Expression::SumIf(_)
665                | Expression::Stddev(_)
666                | Expression::StddevPop(_)
667                | Expression::StddevSamp(_)
668                | Expression::Variance(_)
669                | Expression::VarPop(_)
670                | Expression::VarSamp(_)
671                | Expression::Median(_)
672                | Expression::Mode(_)
673                | Expression::First(_)
674                | Expression::Last(_)
675                | Expression::AnyValue(_)
676                | Expression::ApproxDistinct(_)
677                | Expression::ApproxCountDistinct(_)
678                | Expression::ApproxPercentile(_)
679                | Expression::Percentile(_)
680                | Expression::LogicalAnd(_)
681                | Expression::LogicalOr(_)
682                | Expression::Skewness(_)
683                | Expression::BitwiseCount(_)
684                | Expression::ArrayConcatAgg(_)
685                | Expression::ArrayUniqueAgg(_)
686                | Expression::BoolXorAgg(_)
687                | Expression::ParameterizedAgg(_)
688                | Expression::ArgMax(_)
689                | Expression::ArgMin(_)
690                | Expression::ApproxTopK(_)
691                | Expression::ApproxTopKAccumulate(_)
692                | Expression::ApproxTopKCombine(_)
693                | Expression::ApproxTopKEstimate(_)
694                | Expression::ApproxTopSum(_)
695                | Expression::ApproxQuantiles(_)
696                | Expression::Grouping(_)
697                | Expression::GroupingId(_)
698                | Expression::AnonymousAggFunc(_)
699                | Expression::CombinedAggFunc(_)
700                | Expression::CombinedParameterizedAgg(_)
701                | Expression::HashAgg(_)
702                | Expression::ObjectAgg(_)
703                | Expression::AIAgg(_)
704        )
705    })
706}
707
708fn collect_input_datasets(
709    expr: &Expression,
710    options: &OpenLineageOptions,
711    output: Option<&OpenLineageDatasetId>,
712    warnings: &mut Vec<OpenLineageWarning>,
713) -> Result<Vec<OpenLineageDatasetId>> {
714    let cte_aliases = collect_cte_aliases(expr, options.dialect);
715    let mut seen = BTreeSet::new();
716    let mut result = Vec::new();
717
718    for table in expr.dfs().filter_map(|node| match node {
719        Expression::Table(table) => Some(table),
720        _ => None,
721    }) {
722        let qname = table_ref_qualified_name(table);
723        let normalized = normalize_identifier(&table.name.name, options.dialect, true);
724        if cte_aliases.contains(&normalized) {
725            continue;
726        }
727        if output
728            .map(|out| out.name == qname || out.name == table.name.name)
729            .unwrap_or(false)
730        {
731            continue;
732        }
733        match dataset_from_table_name(&qname, options) {
734            Ok(dataset) => {
735                if seen.insert((dataset.namespace.clone(), dataset.name.clone())) {
736                    result.push(dataset);
737                }
738            }
739            Err(err) => warnings.push(OpenLineageWarning::new(
740                "W_UNRESOLVED_DATASET",
741                format!("Could not map input table '{qname}': {err}"),
742            )),
743        }
744    }
745
746    Ok(result)
747}
748
749fn attach_output_facets(
750    output: &mut OpenLineageDataset,
751    output_id: &OpenLineageDatasetId,
752    options: &OpenLineageOptions,
753    fields: &BTreeMap<String, ColumnLineageField>,
754) -> Result<()> {
755    let column_lineage = ColumnLineageDatasetFacet {
756        producer: options.producer.clone(),
757        schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
758        fields: fields.clone(),
759    };
760    output.facets.insert(
761        "columnLineage".to_string(),
762        serde_json::to_value(column_lineage).map_err(openlineage_serialization_error)?,
763    );
764
765    if let Some(schema_facet) = schema_facet_for_dataset(output_id, options) {
766        output.facets.insert(
767            "schema".to_string(),
768            serde_json::to_value(schema_facet).map_err(openlineage_serialization_error)?,
769        );
770    }
771
772    Ok(())
773}
774
775fn job_facets(sql: &str, options: &OpenLineageOptions) -> Value {
776    json!({
777        "sql": {
778            "_producer": options.producer,
779            "_schemaURL": SQL_JOB_FACET_SCHEMA_URL,
780            "query": sql,
781            "dialect": options.dialect.to_string(),
782        },
783        "jobType": {
784            "_producer": options.producer,
785            "_schemaURL": JOB_TYPE_JOB_FACET_SCHEMA_URL,
786            "processingType": "BATCH",
787            "integration": "POLYGLOT_SQL",
788            "jobType": "QUERY",
789        }
790    })
791}
792
793#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
794struct SchemaDatasetFacet {
795    #[serde(rename = "_producer")]
796    producer: String,
797    #[serde(rename = "_schemaURL")]
798    schema_url: String,
799    fields: Vec<SchemaDatasetFacetField>,
800}
801
802#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
803struct SchemaDatasetFacetField {
804    name: String,
805    #[serde(skip_serializing_if = "String::is_empty", default)]
806    #[serde(rename = "type")]
807    data_type: String,
808    #[serde(skip_serializing_if = "Option::is_none")]
809    ordinal_position: Option<usize>,
810}
811
812fn schema_facet_for_dataset(
813    output: &OpenLineageDatasetId,
814    options: &OpenLineageOptions,
815) -> Option<SchemaDatasetFacet> {
816    let schema = options.schema.as_ref()?;
817    let table = schema.tables.iter().find(|table| {
818        let qname = if let Some(schema_name) = &table.schema {
819            format!("{}.{}", schema_name, table.name)
820        } else {
821            table.name.clone()
822        };
823        output.name == table.name || output.name == qname
824    })?;
825
826    Some(SchemaDatasetFacet {
827        producer: options.producer.clone(),
828        schema_url: SCHEMA_DATASET_FACET_SCHEMA_URL.to_string(),
829        fields: table
830            .columns
831            .iter()
832            .enumerate()
833            .map(|(idx, col)| SchemaDatasetFacetField {
834                name: col.name.clone(),
835                data_type: col.data_type.clone(),
836                ordinal_position: Some(idx + 1),
837            })
838            .collect(),
839    })
840}
841
842fn expand_star_output_fields(
843    select: &Select,
844    star_expr: &Expression,
845    schema: Option<&dyn Schema>,
846    warnings: &mut Vec<OpenLineageWarning>,
847    fields: &mut Vec<OutputField>,
848) {
849    let Some(schema) = schema else {
850        warnings.push(OpenLineageWarning::new(
851            "W_STAR_WITHOUT_SCHEMA",
852            "SELECT * cannot be expanded into OpenLineage column lineage without schema metadata",
853        ));
854        return;
855    };
856
857    let qualifier = star_qualifier(star_expr);
858    let sources = select_source_tables(select);
859    for (alias, qname) in sources {
860        if qualifier
861            .as_ref()
862            .map(|q| q != &alias && q != &qname)
863            .unwrap_or(false)
864        {
865            continue;
866        }
867        match schema.column_names(&qname) {
868            Ok(columns) => {
869                for name in columns {
870                    fields.push(OutputField {
871                        lineage_name: name.clone(),
872                        name,
873                        expression: None,
874                        star_source_table: Some(qname.clone()),
875                    });
876                }
877            }
878            Err(err) => warnings.push(OpenLineageWarning::new(
879                "W_STAR_SCHEMA_LOOKUP_FAILED",
880                format!("Could not expand SELECT * for table '{}': {}", qname, err),
881            )),
882        }
883    }
884}
885
886fn select_source_tables(select: &Select) -> Vec<(String, String)> {
887    let mut result = Vec::new();
888    if let Some(from) = &select.from {
889        for expr in &from.expressions {
890            collect_source_table(expr, &mut result);
891        }
892    }
893    for join in &select.joins {
894        collect_source_table(&join.this, &mut result);
895    }
896    result
897}
898
899fn collect_source_table(expr: &Expression, result: &mut Vec<(String, String)>) {
900    match expr {
901        Expression::Table(table) => {
902            let qname = table_ref_qualified_name(table);
903            let alias = table
904                .alias
905                .as_ref()
906                .map(|a| a.name.clone())
907                .unwrap_or_else(|| table.name.name.clone());
908            result.push((alias, qname));
909        }
910        Expression::Alias(alias) => collect_source_table(&alias.this, result),
911        Expression::Paren(paren) => collect_source_table(&paren.this, result),
912        _ => {}
913    }
914}
915
916fn leftmost_select(expr: &Expression) -> Option<&Select> {
917    match expr {
918        Expression::Prepare(prepare) => leftmost_select(&prepare.statement),
919        Expression::Select(select) => Some(select),
920        Expression::Union(union) => leftmost_select(&union.left),
921        Expression::Intersect(intersect) => leftmost_select(&intersect.left),
922        Expression::Except(except) => leftmost_select(&except.left),
923        Expression::Subquery(subquery) => leftmost_select(&subquery.this),
924        _ => None,
925    }
926}
927
928fn output_name(expr: &Expression) -> Option<String> {
929    match expr {
930        Expression::Alias(alias) => Some(alias.alias.name.clone()),
931        Expression::Column(col) => Some(col.name.name.clone()),
932        Expression::Identifier(id) => Some(id.name.clone()),
933        Expression::Annotated(a) => output_name(&a.this),
934        _ => None,
935    }
936}
937
938fn unalias(expr: &Expression) -> &Expression {
939    match expr {
940        Expression::Alias(alias) => &alias.this,
941        Expression::Annotated(a) => unalias(&a.this),
942        _ => expr,
943    }
944}
945
946fn is_star_expr(expr: &Expression) -> bool {
947    matches!(expr, Expression::Star(_))
948        || matches!(expr, Expression::Column(col) if col.name.name == "*")
949}
950
951fn star_qualifier(expr: &Expression) -> Option<String> {
952    match expr {
953        Expression::Star(star) => star.table.as_ref().map(|t| t.name.clone()),
954        Expression::Column(col) if col.name.name == "*" => {
955            col.table.as_ref().map(|t| t.name.clone())
956        }
957        _ => None,
958    }
959}
960
961fn dataset_from_expression(
962    expr: &Expression,
963    options: &OpenLineageOptions,
964) -> Result<OpenLineageDatasetId> {
965    match expr {
966        Expression::Table(table) => dataset_from_table_ref(table, options),
967        Expression::Identifier(id) => dataset_from_table_name(&id.name, options),
968        _ => Err(Error::unsupported(
969            "OpenLineage dataset extraction from non-table expression",
970            options.dialect.to_string(),
971        )),
972    }
973}
974
975fn dataset_from_table_ref(
976    table: &TableRef,
977    options: &OpenLineageOptions,
978) -> Result<OpenLineageDatasetId> {
979    dataset_from_table_name(&table_ref_qualified_name(table), options)
980}
981
982fn dataset_from_table_name(
983    table_name: &str,
984    options: &OpenLineageOptions,
985) -> Result<OpenLineageDatasetId> {
986    if let Some(mapped) = options.dataset_mappings.get(table_name) {
987        return Ok(mapped.clone());
988    }
989    let namespace = options.dataset_namespace.as_ref().ok_or_else(|| {
990        Error::parse(
991            format!(
992                "Missing datasetNamespace or explicit dataset mapping for table '{}'",
993                table_name
994            ),
995            0,
996            0,
997            0,
998            0,
999        )
1000    })?;
1001    Ok(OpenLineageDatasetId::new(namespace, table_name))
1002}
1003
1004fn table_ref_qualified_name(table: &TableRef) -> String {
1005    let mut parts = Vec::new();
1006    if let Some(catalog) = &table.catalog {
1007        parts.push(catalog.name.clone());
1008    }
1009    if let Some(schema) = &table.schema {
1010        parts.push(schema.name.clone());
1011    }
1012    parts.push(table.name.name.clone());
1013    parts.join(".")
1014}
1015
1016fn collect_cte_aliases(expr: &Expression, dialect: DialectType) -> HashSet<String> {
1017    let mut aliases = HashSet::new();
1018    for node in expr.dfs() {
1019        match node {
1020            Expression::Select(select) => {
1021                if let Some(with) = &select.with {
1022                    collect_with_aliases(with, dialect, &mut aliases);
1023                }
1024            }
1025            Expression::Union(union) => {
1026                if let Some(with) = &union.with {
1027                    collect_with_aliases(with, dialect, &mut aliases);
1028                }
1029            }
1030            Expression::Intersect(intersect) => {
1031                if let Some(with) = &intersect.with {
1032                    collect_with_aliases(with, dialect, &mut aliases);
1033                }
1034            }
1035            Expression::Except(except) => {
1036                if let Some(with) = &except.with {
1037                    collect_with_aliases(with, dialect, &mut aliases);
1038                }
1039            }
1040            _ => {}
1041        }
1042    }
1043    aliases
1044}
1045
1046fn collect_with_aliases(with: &With, dialect: DialectType, aliases: &mut HashSet<String>) {
1047    for cte in &with.ctes {
1048        aliases.insert(normalize_identifier(&cte.alias.name, dialect, true));
1049    }
1050}
1051
1052fn normalize_identifier(name: &str, dialect: DialectType, is_table: bool) -> String {
1053    crate::schema::normalize_name(name, Some(dialect), is_table, true)
1054}
1055
1056fn openlineage_serialization_error(err: serde_json::Error) -> Error {
1057    Error::internal(format!("OpenLineage serialization failed: {err}"))
1058}
1059
1060fn deserialize_dialect_type<'de, D>(deserializer: D) -> std::result::Result<DialectType, D::Error>
1061where
1062    D: Deserializer<'de>,
1063{
1064    let value = String::deserialize(deserializer)?;
1065    value.parse::<DialectType>().map_err(de::Error::custom)
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070    use super::*;
1071
1072    fn options() -> OpenLineageOptions {
1073        OpenLineageOptions {
1074            dialect: DialectType::PostgreSQL,
1075            producer: "https://github.com/tobilg/polyglot".to_string(),
1076            dataset_namespace: Some("postgres://warehouse".to_string()),
1077            output_dataset: Some(OpenLineageDatasetId::new(
1078                "postgres://warehouse",
1079                "analytics.out",
1080            )),
1081            job_namespace: Some("polyglot-tests".to_string()),
1082            job_name: Some("lineage-test".to_string()),
1083            event_time: Some("2026-05-18T00:00:00Z".to_string()),
1084            run_id: Some("3b452093-782c-4ef2-9c0c-aafe2aa6f34d".to_string()),
1085            event_type: Some(OpenLineageRunEventType::Complete),
1086            ..Default::default()
1087        }
1088    }
1089
1090    #[test]
1091    fn deserializes_dialect_aliases_in_options() {
1092        let options: OpenLineageOptions =
1093            serde_json::from_str(r#"{"producer":"polyglot","dialect":"postgres"}"#)
1094                .expect("options");
1095        assert_eq!(options.dialect, DialectType::PostgreSQL);
1096    }
1097
1098    #[test]
1099    fn emits_identity_column_lineage_for_select() {
1100        let result = openlineage_column_lineage("SELECT a FROM t", &options()).expect("lineage");
1101        let field = result.facet.fields.get("a").expect("field a");
1102        assert_eq!(field.input_fields.len(), 1);
1103        assert_eq!(field.input_fields[0].name, "t");
1104        assert_eq!(field.input_fields[0].field, "a");
1105        assert_eq!(field.input_fields[0].transformations[0].subtype, "IDENTITY");
1106    }
1107
1108    #[test]
1109    fn emits_column_lineage_for_prepared_statement_body() {
1110        let result = openlineage_column_lineage(
1111            "PREPARE leak AS SELECT id FROM sensitive_table WHERE id = $1",
1112            &options(),
1113        )
1114        .expect("lineage");
1115        let field = result.facet.fields.get("id").expect("field id");
1116        assert_eq!(field.input_fields.len(), 1);
1117        assert_eq!(field.input_fields[0].name, "sensitive_table");
1118        assert_eq!(field.input_fields[0].field, "id");
1119    }
1120
1121    #[test]
1122    fn resolves_input_dataset_behind_table_alias() {
1123        let result = openlineage_column_lineage("SELECT o.total FROM orders o", &options())
1124            .expect("lineage");
1125        let field = result.facet.fields.get("total").expect("field total");
1126        assert_eq!(field.input_fields[0].name, "orders");
1127        assert_eq!(field.input_fields[0].field, "total");
1128    }
1129
1130    #[test]
1131    fn emits_transformation_column_lineage_for_expression() {
1132        let result =
1133            openlineage_column_lineage("SELECT a + b AS c FROM t", &options()).expect("lineage");
1134        let field = result.facet.fields.get("c").expect("field c");
1135        assert_eq!(field.input_fields.len(), 2);
1136        assert!(field.input_fields.iter().any(|f| f.field == "a"));
1137        assert!(field.input_fields.iter().any(|f| f.field == "b"));
1138        assert!(field
1139            .input_fields
1140            .iter()
1141            .all(|f| f.transformations[0].subtype == "TRANSFORMATION"));
1142    }
1143
1144    #[test]
1145    fn omits_bigquery_safe_namespace_from_column_lineage_issue207() {
1146        let mut opts = options();
1147        opts.dialect = DialectType::BigQuery;
1148
1149        let result = openlineage_column_lineage(
1150            r#"
1151WITH import_cte AS (
1152  SELECT timestamp, data, operation
1153  FROM `project`.`dataset`.`source_table`
1154),
1155transform_cte AS (
1156  SELECT
1157    timestamp,
1158    SAFE.PARSE_JSON(data) AS json_data
1159  FROM import_cte
1160)
1161SELECT json_data FROM transform_cte
1162"#,
1163            &opts,
1164        )
1165        .expect("lineage");
1166        let field = result.facet.fields.get("json_data").expect("json_data");
1167
1168        assert!(
1169            field.input_fields.iter().any(|input| input.field == "data"),
1170            "expected data input field, got {:?}",
1171            field.input_fields
1172        );
1173        assert!(
1174            !field
1175                .input_fields
1176                .iter()
1177                .any(|input| input.field.eq_ignore_ascii_case("safe")),
1178            "did not expect SAFE namespace as input field, got {:?}",
1179            field.input_fields
1180        );
1181    }
1182
1183    #[test]
1184    fn emits_bigquery_unnest_alias_column_lineage_issue209() {
1185        let mut opts = options();
1186        opts.dialect = DialectType::BigQuery;
1187        opts.dataset_namespace = Some("bigquery://warehouse".to_string());
1188        opts.output_dataset = Some(OpenLineageDatasetId::new(
1189            "bigquery://warehouse",
1190            "calendar",
1191        ));
1192
1193        let result = openlineage_column_lineage(
1194            r#"
1195SELECT date_val AS week_start
1196FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
1197"#,
1198            &opts,
1199        )
1200        .expect("lineage");
1201        let field = result.facet.fields.get("week_start").expect("week_start");
1202
1203        assert_eq!(field.input_fields.len(), 1);
1204        assert_eq!(field.input_fields[0].name, "date_val");
1205        assert_eq!(field.input_fields[0].field, "date_val");
1206        assert!(
1207            result
1208                .warnings
1209                .iter()
1210                .all(|warning| warning.code != "W_EMPTY_FIELD_LINEAGE"),
1211            "did not expect empty-lineage warning, got {:?}",
1212            result.warnings
1213        );
1214    }
1215
1216    #[test]
1217    fn emits_aggregation_column_lineage() {
1218        let result =
1219            openlineage_column_lineage("SELECT SUM(amount) AS total FROM orders", &options())
1220                .expect("lineage");
1221        let field = result.facet.fields.get("total").expect("field total");
1222        assert_eq!(field.input_fields[0].field, "amount");
1223        assert_eq!(
1224            field.input_fields[0].transformations[0].subtype,
1225            "AGGREGATION"
1226        );
1227    }
1228
1229    #[test]
1230    fn infers_insert_output_dataset() {
1231        let mut opts = options();
1232        opts.output_dataset = None;
1233        let result =
1234            openlineage_column_lineage("INSERT INTO analytics.out SELECT a FROM raw.input", &opts)
1235                .expect("lineage");
1236        assert_eq!(result.outputs[0].name, "analytics.out");
1237        assert_eq!(result.inputs[0].name, "raw.input");
1238    }
1239
1240    #[test]
1241    fn maps_insert_target_columns_to_output_fields() {
1242        let mut opts = options();
1243        opts.output_dataset = None;
1244        let result = openlineage_column_lineage(
1245            "INSERT INTO analytics.out (target_a) SELECT source_a FROM raw.input",
1246            &opts,
1247        )
1248        .expect("lineage");
1249        let field = result.facet.fields.get("target_a").expect("target field");
1250        assert_eq!(field.input_fields[0].field, "source_a");
1251        assert!(!result.facet.fields.contains_key("source_a"));
1252    }
1253
1254    #[test]
1255    fn pure_select_requires_output_dataset() {
1256        let mut opts = options();
1257        opts.output_dataset = None;
1258        let err = openlineage_column_lineage("SELECT a FROM t", &opts).unwrap_err();
1259        assert!(err.to_string().contains("outputDataset is required"));
1260    }
1261
1262    #[test]
1263    fn emits_job_event_payload() {
1264        let result = openlineage_job_event("SELECT a FROM t", &options()).expect("event");
1265        assert_eq!(result.event["job"]["namespace"], "polyglot-tests");
1266        assert_eq!(
1267            result.event["job"]["facets"]["sql"]["_schemaURL"],
1268            SQL_JOB_FACET_SCHEMA_URL
1269        );
1270        assert_eq!(
1271            result.event["outputs"][0]["facets"]["columnLineage"]["fields"]["a"]["inputFields"][0]
1272                ["field"],
1273            "a"
1274        );
1275    }
1276
1277    #[test]
1278    fn emits_run_event_payload() {
1279        let result = openlineage_run_event("SELECT a FROM t", &options()).expect("event");
1280        assert_eq!(result.event["eventType"], "COMPLETE");
1281        assert_eq!(
1282            result.event["run"]["runId"],
1283            "3b452093-782c-4ef2-9c0c-aafe2aa6f34d"
1284        );
1285    }
1286
1287    #[test]
1288    fn select_star_without_schema_warns() {
1289        let result = openlineage_column_lineage("SELECT * FROM t", &options()).expect("lineage");
1290        assert!(result.facet.fields.is_empty());
1291        assert!(result
1292            .warnings
1293            .iter()
1294            .any(|w| w.code == "W_STAR_WITHOUT_SCHEMA"));
1295    }
1296
1297    #[test]
1298    fn select_star_with_schema_expands_fields() {
1299        let mut opts = options();
1300        opts.schema = Some(ValidationSchema {
1301            strict: None,
1302            tables: vec![crate::validation::SchemaTable {
1303                name: "t".to_string(),
1304                schema: None,
1305                columns: vec![
1306                    crate::validation::SchemaColumn {
1307                        name: "a".to_string(),
1308                        data_type: "INT".to_string(),
1309                        nullable: None,
1310                        primary_key: false,
1311                        unique: false,
1312                        references: None,
1313                    },
1314                    crate::validation::SchemaColumn {
1315                        name: "b".to_string(),
1316                        data_type: "TEXT".to_string(),
1317                        nullable: None,
1318                        primary_key: false,
1319                        unique: false,
1320                        references: None,
1321                    },
1322                ],
1323                aliases: vec![],
1324                primary_key: vec![],
1325                unique_keys: vec![],
1326                foreign_keys: vec![],
1327            }],
1328        });
1329
1330        let result = openlineage_column_lineage("SELECT * FROM t", &opts).expect("lineage");
1331        assert!(result.facet.fields.contains_key("a"));
1332        assert!(result.facet.fields.contains_key("b"));
1333    }
1334}