Skip to main content

polyglot_sql/
openlineage.rs

1//! OpenLineage-compatible payload generation.
2//!
3//! This module only builds OpenLineage JSON-compatible structures from SQL
4//! analysis. It deliberately does not implement transports, clients, retries,
5//! buffering, or runtime lifecycle management.
6
7use crate::dialects::{Dialect, DialectType};
8use crate::expressions::*;
9use crate::lineage::{self, LineageNode};
10use crate::schema::Schema;
11use crate::traversal::ExpressionWalk;
12use crate::{mapping_schema_from_validation_schema, Error, Result, ValidationSchema};
13use serde::de::{self, Deserializer};
14use serde::{Deserialize, Serialize};
15use serde_json::{json, Value};
16use std::collections::{BTreeMap, BTreeSet, HashSet};
17
18pub const OPENLINEAGE_SCHEMA_URL: &str = "https://openlineage.io/spec/2-0-2/OpenLineage.json";
19pub const COLUMN_LINEAGE_FACET_SCHEMA_URL: &str =
20    "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json";
21pub const SQL_JOB_FACET_SCHEMA_URL: &str =
22    "https://openlineage.io/spec/facets/1-1-0/SQLJobFacet.json";
23pub const JOB_TYPE_JOB_FACET_SCHEMA_URL: &str =
24    "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json";
25pub const SCHEMA_DATASET_FACET_SCHEMA_URL: &str =
26    "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json";
27
28/// Dataset identity in OpenLineage (`namespace`, `name`).
29#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct OpenLineageDatasetId {
32    pub namespace: String,
33    pub name: String,
34}
35
36impl OpenLineageDatasetId {
37    pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
38        Self {
39            namespace: namespace.into(),
40            name: name.into(),
41        }
42    }
43}
44
45/// Options shared by OpenLineage payload generation helpers.
46#[derive(Debug, Clone, Serialize, Deserialize, Default)]
47#[serde(rename_all = "camelCase", default)]
48pub struct OpenLineageOptions {
49    #[serde(deserialize_with = "deserialize_dialect_type")]
50    pub dialect: DialectType,
51    pub producer: String,
52    pub dataset_namespace: Option<String>,
53    pub dataset_mappings: BTreeMap<String, OpenLineageDatasetId>,
54    pub output_dataset: Option<OpenLineageDatasetId>,
55    pub schema: Option<ValidationSchema>,
56    pub job_namespace: Option<String>,
57    pub job_name: Option<String>,
58    pub event_time: Option<String>,
59    pub run_id: Option<String>,
60    pub event_type: Option<OpenLineageRunEventType>,
61}
62
63/// OpenLineage run event type.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
66pub enum OpenLineageRunEventType {
67    Start,
68    Running,
69    Complete,
70    Abort,
71    Fail,
72    Other,
73}
74
75/// Non-fatal issue encountered while generating OpenLineage output.
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "camelCase")]
78pub struct OpenLineageWarning {
79    pub code: String,
80    pub message: String,
81}
82
83impl OpenLineageWarning {
84    fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
85        Self {
86            code: code.into(),
87            message: message.into(),
88        }
89    }
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93#[serde(rename_all = "camelCase")]
94pub struct OpenLineageColumnLineageResult {
95    pub facet: ColumnLineageDatasetFacet,
96    pub inputs: Vec<OpenLineageDataset>,
97    pub outputs: Vec<OpenLineageDataset>,
98    pub warnings: Vec<OpenLineageWarning>,
99}
100
101#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
102#[serde(rename_all = "camelCase")]
103pub struct OpenLineageEventResult {
104    pub event: Value,
105    pub warnings: Vec<OpenLineageWarning>,
106}
107
108#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
109pub struct OpenLineageDataset {
110    pub namespace: String,
111    pub name: String,
112    #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
113    pub facets: BTreeMap<String, Value>,
114}
115
116impl std::convert::From<OpenLineageDatasetId> for OpenLineageDataset {
117    fn from(id: OpenLineageDatasetId) -> Self {
118        Self {
119            namespace: id.namespace,
120            name: id.name,
121            facets: BTreeMap::new(),
122        }
123    }
124}
125
126#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127pub struct ColumnLineageDatasetFacet {
128    #[serde(rename = "_producer")]
129    pub producer: String,
130    #[serde(rename = "_schemaURL")]
131    pub schema_url: String,
132    pub fields: BTreeMap<String, ColumnLineageField>,
133}
134
135#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct ColumnLineageField {
138    pub input_fields: Vec<OpenLineageInputField>,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
142#[serde(rename_all = "camelCase")]
143pub struct OpenLineageInputField {
144    pub namespace: String,
145    pub name: String,
146    pub field: String,
147    #[serde(skip_serializing_if = "Vec::is_empty", default)]
148    pub transformations: Vec<OpenLineageTransformation>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
152pub struct OpenLineageTransformation {
153    #[serde(rename = "type")]
154    pub type_: String,
155    pub subtype: String,
156    #[serde(skip_serializing_if = "Option::is_none")]
157    pub description: Option<String>,
158    #[serde(skip_serializing_if = "Option::is_none")]
159    pub masking: Option<bool>,
160}
161
162#[derive(Debug, Clone)]
163struct StatementAnalysis {
164    query: Expression,
165    inputs: Vec<OpenLineageDatasetId>,
166    output: OpenLineageDatasetId,
167    output_column_names: Vec<String>,
168}
169
170#[derive(Debug, Clone)]
171struct OutputField {
172    name: String,
173    lineage_name: String,
174    expression: Option<Expression>,
175    star_source_table: Option<String>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
179struct TerminalField {
180    table: String,
181    field: String,
182}
183
184/// Produce a standalone OpenLineage columnLineage facet plus inferred datasets.
185pub fn openlineage_column_lineage(
186    sql: &str,
187    options: &OpenLineageOptions,
188) -> Result<OpenLineageColumnLineageResult> {
189    validate_common_options(options)?;
190
191    let mut warnings = Vec::new();
192    let schema_mapping = options
193        .schema
194        .as_ref()
195        .map(mapping_schema_from_validation_schema);
196    let dialect = Dialect::get(options.dialect);
197    let mut expressions = dialect.parse(sql)?;
198    if expressions.len() != 1 {
199        return Err(Error::parse(
200            format!(
201                "OpenLineage generation expects exactly one statement, found {}",
202                expressions.len()
203            ),
204            0,
205            0,
206            0,
207            0,
208        ));
209    }
210
211    let expr = expressions.remove(0);
212    let analysis = analyze_statement(&expr, options, &mut warnings)?;
213    let mut output_fields = output_fields_for_query(
214        &analysis.query,
215        schema_mapping.as_ref().map(|s| s as &dyn Schema),
216        options.dialect,
217        &mut warnings,
218    )?;
219    apply_output_column_names(
220        &mut output_fields,
221        &analysis.output_column_names,
222        &mut warnings,
223    );
224
225    let mut fields = BTreeMap::new();
226    for output_field in output_fields {
227        if fields.contains_key(&output_field.name) {
228            warnings.push(OpenLineageWarning::new(
229                "W_DUPLICATE_OUTPUT_FIELD",
230                format!(
231                    "Duplicate output field '{}' was merged in the OpenLineage fields map",
232                    output_field.name
233                ),
234            ));
235        }
236
237        let input_fields = input_fields_for_output(
238            &analysis.query,
239            &output_field,
240            options,
241            schema_mapping.as_ref().map(|s| s as &dyn Schema),
242            &mut warnings,
243        )?;
244
245        fields.insert(output_field.name, ColumnLineageField { input_fields });
246    }
247
248    let mut outputs = vec![OpenLineageDataset::from(analysis.output.clone())];
249    attach_output_facets(&mut outputs[0], &analysis.output, options, &fields)?;
250
251    Ok(OpenLineageColumnLineageResult {
252        facet: ColumnLineageDatasetFacet {
253            producer: options.producer.clone(),
254            schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
255            fields,
256        },
257        inputs: analysis
258            .inputs
259            .into_iter()
260            .map(OpenLineageDataset::from)
261            .collect(),
262        outputs,
263        warnings,
264    })
265}
266
267/// Produce an OpenLineage JobEvent as JSON.
268pub fn openlineage_job_event(
269    sql: &str,
270    options: &OpenLineageOptions,
271) -> Result<OpenLineageEventResult> {
272    let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
273    let job_name = required_option(&options.job_name, "jobName")?;
274    let event_time = required_option(&options.event_time, "eventTime")?;
275
276    let result = openlineage_column_lineage(sql, options)?;
277    let event = json!({
278        "eventTime": event_time,
279        "producer": options.producer,
280        "schemaURL": OPENLINEAGE_SCHEMA_URL,
281        "job": {
282            "namespace": job_namespace,
283            "name": job_name,
284            "facets": job_facets(sql, options),
285        },
286        "inputs": result.inputs,
287        "outputs": result.outputs,
288    });
289
290    Ok(OpenLineageEventResult {
291        event,
292        warnings: result.warnings,
293    })
294}
295
296/// Produce an OpenLineage RunEvent as JSON.
297pub fn openlineage_run_event(
298    sql: &str,
299    options: &OpenLineageOptions,
300) -> Result<OpenLineageEventResult> {
301    let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
302    let job_name = required_option(&options.job_name, "jobName")?;
303    let event_time = required_option(&options.event_time, "eventTime")?;
304    let run_id = required_option(&options.run_id, "runId")?;
305    let event_type = options
306        .event_type
307        .ok_or_else(|| Error::parse("Missing required option: eventType", 0, 0, 0, 0))?;
308
309    let result = openlineage_column_lineage(sql, options)?;
310    let event = json!({
311        "eventTime": event_time,
312        "eventType": event_type,
313        "producer": options.producer,
314        "schemaURL": OPENLINEAGE_SCHEMA_URL,
315        "run": {
316            "runId": run_id,
317            "facets": {},
318        },
319        "job": {
320            "namespace": job_namespace,
321            "name": job_name,
322            "facets": job_facets(sql, options),
323        },
324        "inputs": result.inputs,
325        "outputs": result.outputs,
326    });
327
328    Ok(OpenLineageEventResult {
329        event,
330        warnings: result.warnings,
331    })
332}
333
334fn validate_common_options(options: &OpenLineageOptions) -> Result<()> {
335    if options.producer.trim().is_empty() {
336        return Err(Error::parse(
337            "Missing required option: producer",
338            0,
339            0,
340            0,
341            0,
342        ));
343    }
344    Ok(())
345}
346
347fn required_option(value: &Option<String>, name: &str) -> Result<String> {
348    match value.as_ref().filter(|v| !v.trim().is_empty()) {
349        Some(value) => Ok(value.clone()),
350        None => Err(Error::parse(
351            format!("Missing required option: {name}"),
352            0,
353            0,
354            0,
355            0,
356        )),
357    }
358}
359
360fn analyze_statement(
361    expr: &Expression,
362    options: &OpenLineageOptions,
363    warnings: &mut Vec<OpenLineageWarning>,
364) -> Result<StatementAnalysis> {
365    match expr {
366        Expression::Select(select) => {
367            let output = if let Some(into) = &select.into {
368                dataset_from_expression(&into.this, options)?
369            } else {
370                options.output_dataset.clone().ok_or_else(|| {
371                    Error::parse(
372                        "OpenLineage outputDataset is required for SELECT statements without SELECT INTO",
373                        0,
374                        0,
375                        0,
376                        0,
377                    )
378                })?
379            };
380            Ok(StatementAnalysis {
381                query: expr.clone(),
382                inputs: collect_input_datasets(expr, options, Some(&output), warnings)?,
383                output,
384                output_column_names: Vec::new(),
385            })
386        }
387        Expression::Insert(insert) => {
388            let output = dataset_from_table_ref(&insert.table, options)?;
389            let query = insert.query.clone().ok_or_else(|| {
390                Error::unsupported(
391                    "OpenLineage column lineage for INSERT without query",
392                    options.dialect.to_string(),
393                )
394            })?;
395            Ok(StatementAnalysis {
396                inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
397                query,
398                output,
399                output_column_names: insert.columns.iter().map(|col| col.name.clone()).collect(),
400            })
401        }
402        Expression::CreateTable(create) => {
403            let output = dataset_from_table_ref(&create.name, options)?;
404            let query = create.as_select.clone().ok_or_else(|| {
405                Error::unsupported(
406                    "OpenLineage column lineage for CREATE TABLE without AS SELECT",
407                    options.dialect.to_string(),
408                )
409            })?;
410            Ok(StatementAnalysis {
411                inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
412                query,
413                output,
414                output_column_names: create
415                    .columns
416                    .iter()
417                    .map(|col| col.name.name.clone())
418                    .collect(),
419            })
420        }
421        _ => Err(Error::unsupported(
422            format!("OpenLineage generation for {}", expr.variant_name()),
423            options.dialect.to_string(),
424        )),
425    }
426}
427
428fn output_fields_for_query(
429    query: &Expression,
430    schema: Option<&dyn Schema>,
431    dialect: DialectType,
432    warnings: &mut Vec<OpenLineageWarning>,
433) -> Result<Vec<OutputField>> {
434    let select = leftmost_select(query).ok_or_else(|| {
435        Error::unsupported(
436            "OpenLineage output field extraction for non-SELECT query",
437            dialect.to_string(),
438        )
439    })?;
440
441    let mut fields = Vec::new();
442    for (idx, expr) in select.expressions.iter().enumerate() {
443        if is_star_expr(expr) {
444            expand_star_output_fields(select, expr, schema, warnings, &mut fields);
445            continue;
446        }
447
448        let name = output_name(expr).unwrap_or_else(|| format!("_{idx}"));
449        fields.push(OutputField {
450            lineage_name: name.clone(),
451            name,
452            expression: Some(expr.clone()),
453            star_source_table: None,
454        });
455    }
456    Ok(fields)
457}
458
459fn apply_output_column_names(
460    fields: &mut [OutputField],
461    output_column_names: &[String],
462    warnings: &mut Vec<OpenLineageWarning>,
463) {
464    if output_column_names.is_empty() {
465        return;
466    }
467    if output_column_names.len() != fields.len() {
468        warnings.push(OpenLineageWarning::new(
469            "W_OUTPUT_COLUMN_COUNT_MISMATCH",
470            format!(
471                "Target column count ({}) does not match projected column count ({})",
472                output_column_names.len(),
473                fields.len()
474            ),
475        ));
476        return;
477    }
478    for (field, output_name) in fields.iter_mut().zip(output_column_names) {
479        field.name = output_name.clone();
480    }
481}
482
483fn input_fields_for_output(
484    query: &Expression,
485    output_field: &OutputField,
486    options: &OpenLineageOptions,
487    schema: Option<&dyn Schema>,
488    warnings: &mut Vec<OpenLineageWarning>,
489) -> Result<Vec<OpenLineageInputField>> {
490    if let Some(table) = &output_field.star_source_table {
491        return terminal_fields_to_openlineage(
492            vec![TerminalField {
493                table: table.clone(),
494                field: output_field.lineage_name.clone(),
495            }],
496            "IDENTITY",
497            Some(format!("SELECT {}", output_field.lineage_name)),
498            options,
499            warnings,
500        );
501    }
502
503    let lineage_result = if let Some(schema) = schema {
504        lineage::lineage_with_schema(
505            &output_field.lineage_name,
506            query,
507            Some(schema),
508            Some(options.dialect),
509            false,
510        )
511    } else {
512        lineage::lineage(
513            &output_field.lineage_name,
514            query,
515            Some(options.dialect),
516            false,
517        )
518    };
519
520    let node = match lineage_result {
521        Ok(node) => node,
522        Err(err) => {
523            warnings.push(OpenLineageWarning::new(
524                "W_UNRESOLVED_OUTPUT_FIELD",
525                format!(
526                    "Could not resolve lineage for output field '{}': {}",
527                    output_field.name, err
528                ),
529            ));
530            return Ok(Vec::new());
531        }
532    };
533
534    let mut terminals = BTreeSet::new();
535    collect_terminal_fields(&node, &mut terminals);
536    let terminals: Vec<TerminalField> = terminals.into_iter().collect();
537
538    if terminals.is_empty() {
539        warnings.push(OpenLineageWarning::new(
540            "W_EMPTY_FIELD_LINEAGE",
541            format!(
542                "No input fields were found for output field '{}'",
543                output_field.name
544            ),
545        ));
546        return Ok(Vec::new());
547    }
548
549    let subtype = transformation_subtype(output_field.expression.as_ref(), &terminals);
550    let description = output_field
551        .expression
552        .as_ref()
553        .map(|expr| expr.sql_for(options.dialect));
554
555    terminal_fields_to_openlineage(terminals, subtype, description, options, warnings)
556}
557
558fn terminal_fields_to_openlineage(
559    terminals: Vec<TerminalField>,
560    subtype: &str,
561    description: Option<String>,
562    options: &OpenLineageOptions,
563    warnings: &mut Vec<OpenLineageWarning>,
564) -> Result<Vec<OpenLineageInputField>> {
565    let mut result = Vec::new();
566    for terminal in terminals {
567        let dataset = dataset_from_table_name(&terminal.table, options).map_err(|err| {
568            warnings.push(OpenLineageWarning::new(
569                "W_UNRESOLVED_DATASET",
570                format!(
571                    "Could not map table '{}' to an OpenLineage dataset: {}",
572                    terminal.table, err
573                ),
574            ));
575            err
576        })?;
577        result.push(OpenLineageInputField {
578            namespace: dataset.namespace,
579            name: dataset.name,
580            field: terminal.field,
581            transformations: vec![OpenLineageTransformation {
582                type_: "DIRECT".to_string(),
583                subtype: subtype.to_string(),
584                description: description.clone(),
585                masking: Some(false),
586            }],
587        });
588    }
589    Ok(result)
590}
591
592fn transformation_subtype(expr: Option<&Expression>, terminals: &[TerminalField]) -> &'static str {
593    let Some(expr) = expr else {
594        return "TRANSFORMATION";
595    };
596    let unaliased = unalias(expr);
597    if expression_contains_aggregate(unaliased) {
598        return "AGGREGATION";
599    }
600    if terminals.len() == 1 {
601        if let Expression::Column(col) = unaliased {
602            if col.name.name == terminals[0].field {
603                return "IDENTITY";
604            }
605        }
606    }
607    "TRANSFORMATION"
608}
609
610fn collect_terminal_fields(node: &LineageNode, terminals: &mut BTreeSet<TerminalField>) {
611    if node.downstream.is_empty() {
612        if let Expression::Column(column) = &node.expression {
613            let table = if !node.source_name.is_empty() {
614                Some(node.source_name.clone())
615            } else if let Expression::Table(table) = &node.source {
616                Some(table_ref_qualified_name(table))
617            } else {
618                column.table.as_ref().map(|t| t.name.clone())
619            };
620            if let Some(table) = table.filter(|t| !t.is_empty()) {
621                terminals.insert(TerminalField {
622                    table,
623                    field: column.name.name.clone(),
624                });
625            }
626        }
627        return;
628    }
629
630    for child in &node.downstream {
631        collect_terminal_fields(child, terminals);
632    }
633}
634
635fn expression_contains_aggregate(expr: &Expression) -> bool {
636    expr.contains(|node| {
637        matches!(
638            node,
639            Expression::AggregateFunction(_)
640                | Expression::Sum(_)
641                | Expression::Count(_)
642                | Expression::Avg(_)
643                | Expression::Min(_)
644                | Expression::Max(_)
645                | Expression::GroupConcat(_)
646                | Expression::StringAgg(_)
647                | Expression::ListAgg(_)
648                | Expression::ArrayAgg(_)
649                | Expression::CountIf(_)
650                | Expression::SumIf(_)
651                | Expression::Stddev(_)
652                | Expression::StddevPop(_)
653                | Expression::StddevSamp(_)
654                | Expression::Variance(_)
655                | Expression::VarPop(_)
656                | Expression::VarSamp(_)
657                | Expression::Median(_)
658                | Expression::Mode(_)
659                | Expression::First(_)
660                | Expression::Last(_)
661                | Expression::AnyValue(_)
662                | Expression::ApproxDistinct(_)
663                | Expression::ApproxCountDistinct(_)
664                | Expression::ApproxPercentile(_)
665                | Expression::Percentile(_)
666                | Expression::LogicalAnd(_)
667                | Expression::LogicalOr(_)
668                | Expression::Skewness(_)
669                | Expression::BitwiseCount(_)
670                | Expression::ArrayConcatAgg(_)
671                | Expression::ArrayUniqueAgg(_)
672                | Expression::BoolXorAgg(_)
673                | Expression::ParameterizedAgg(_)
674                | Expression::ArgMax(_)
675                | Expression::ArgMin(_)
676                | Expression::ApproxTopK(_)
677                | Expression::ApproxTopKAccumulate(_)
678                | Expression::ApproxTopKCombine(_)
679                | Expression::ApproxTopKEstimate(_)
680                | Expression::ApproxTopSum(_)
681                | Expression::ApproxQuantiles(_)
682                | Expression::Grouping(_)
683                | Expression::GroupingId(_)
684                | Expression::AnonymousAggFunc(_)
685                | Expression::CombinedAggFunc(_)
686                | Expression::CombinedParameterizedAgg(_)
687                | Expression::HashAgg(_)
688                | Expression::ObjectAgg(_)
689                | Expression::AIAgg(_)
690        )
691    })
692}
693
694fn collect_input_datasets(
695    expr: &Expression,
696    options: &OpenLineageOptions,
697    output: Option<&OpenLineageDatasetId>,
698    warnings: &mut Vec<OpenLineageWarning>,
699) -> Result<Vec<OpenLineageDatasetId>> {
700    let cte_aliases = collect_cte_aliases(expr, options.dialect);
701    let mut seen = BTreeSet::new();
702    let mut result = Vec::new();
703
704    for table in expr.dfs().filter_map(|node| match node {
705        Expression::Table(table) => Some(table),
706        _ => None,
707    }) {
708        let qname = table_ref_qualified_name(table);
709        let normalized = normalize_identifier(&table.name.name, options.dialect, true);
710        if cte_aliases.contains(&normalized) {
711            continue;
712        }
713        if output
714            .map(|out| out.name == qname || out.name == table.name.name)
715            .unwrap_or(false)
716        {
717            continue;
718        }
719        match dataset_from_table_name(&qname, options) {
720            Ok(dataset) => {
721                if seen.insert((dataset.namespace.clone(), dataset.name.clone())) {
722                    result.push(dataset);
723                }
724            }
725            Err(err) => warnings.push(OpenLineageWarning::new(
726                "W_UNRESOLVED_DATASET",
727                format!("Could not map input table '{qname}': {err}"),
728            )),
729        }
730    }
731
732    Ok(result)
733}
734
735fn attach_output_facets(
736    output: &mut OpenLineageDataset,
737    output_id: &OpenLineageDatasetId,
738    options: &OpenLineageOptions,
739    fields: &BTreeMap<String, ColumnLineageField>,
740) -> Result<()> {
741    let column_lineage = ColumnLineageDatasetFacet {
742        producer: options.producer.clone(),
743        schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
744        fields: fields.clone(),
745    };
746    output.facets.insert(
747        "columnLineage".to_string(),
748        serde_json::to_value(column_lineage).map_err(openlineage_serialization_error)?,
749    );
750
751    if let Some(schema_facet) = schema_facet_for_dataset(output_id, options) {
752        output.facets.insert(
753            "schema".to_string(),
754            serde_json::to_value(schema_facet).map_err(openlineage_serialization_error)?,
755        );
756    }
757
758    Ok(())
759}
760
761fn job_facets(sql: &str, options: &OpenLineageOptions) -> Value {
762    json!({
763        "sql": {
764            "_producer": options.producer,
765            "_schemaURL": SQL_JOB_FACET_SCHEMA_URL,
766            "query": sql,
767            "dialect": options.dialect.to_string(),
768        },
769        "jobType": {
770            "_producer": options.producer,
771            "_schemaURL": JOB_TYPE_JOB_FACET_SCHEMA_URL,
772            "processingType": "BATCH",
773            "integration": "POLYGLOT_SQL",
774            "jobType": "QUERY",
775        }
776    })
777}
778
779#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
780struct SchemaDatasetFacet {
781    #[serde(rename = "_producer")]
782    producer: String,
783    #[serde(rename = "_schemaURL")]
784    schema_url: String,
785    fields: Vec<SchemaDatasetFacetField>,
786}
787
788#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
789struct SchemaDatasetFacetField {
790    name: String,
791    #[serde(skip_serializing_if = "String::is_empty", default)]
792    #[serde(rename = "type")]
793    data_type: String,
794    #[serde(skip_serializing_if = "Option::is_none")]
795    ordinal_position: Option<usize>,
796}
797
798fn schema_facet_for_dataset(
799    output: &OpenLineageDatasetId,
800    options: &OpenLineageOptions,
801) -> Option<SchemaDatasetFacet> {
802    let schema = options.schema.as_ref()?;
803    let table = schema.tables.iter().find(|table| {
804        let qname = if let Some(schema_name) = &table.schema {
805            format!("{}.{}", schema_name, table.name)
806        } else {
807            table.name.clone()
808        };
809        output.name == table.name || output.name == qname
810    })?;
811
812    Some(SchemaDatasetFacet {
813        producer: options.producer.clone(),
814        schema_url: SCHEMA_DATASET_FACET_SCHEMA_URL.to_string(),
815        fields: table
816            .columns
817            .iter()
818            .enumerate()
819            .map(|(idx, col)| SchemaDatasetFacetField {
820                name: col.name.clone(),
821                data_type: col.data_type.clone(),
822                ordinal_position: Some(idx + 1),
823            })
824            .collect(),
825    })
826}
827
828fn expand_star_output_fields(
829    select: &Select,
830    star_expr: &Expression,
831    schema: Option<&dyn Schema>,
832    warnings: &mut Vec<OpenLineageWarning>,
833    fields: &mut Vec<OutputField>,
834) {
835    let Some(schema) = schema else {
836        warnings.push(OpenLineageWarning::new(
837            "W_STAR_WITHOUT_SCHEMA",
838            "SELECT * cannot be expanded into OpenLineage column lineage without schema metadata",
839        ));
840        return;
841    };
842
843    let qualifier = star_qualifier(star_expr);
844    let sources = select_source_tables(select);
845    for (alias, qname) in sources {
846        if qualifier
847            .as_ref()
848            .map(|q| q != &alias && q != &qname)
849            .unwrap_or(false)
850        {
851            continue;
852        }
853        match schema.column_names(&qname) {
854            Ok(columns) => {
855                for name in columns {
856                    fields.push(OutputField {
857                        lineage_name: name.clone(),
858                        name,
859                        expression: None,
860                        star_source_table: Some(qname.clone()),
861                    });
862                }
863            }
864            Err(err) => warnings.push(OpenLineageWarning::new(
865                "W_STAR_SCHEMA_LOOKUP_FAILED",
866                format!("Could not expand SELECT * for table '{}': {}", qname, err),
867            )),
868        }
869    }
870}
871
872fn select_source_tables(select: &Select) -> Vec<(String, String)> {
873    let mut result = Vec::new();
874    if let Some(from) = &select.from {
875        for expr in &from.expressions {
876            collect_source_table(expr, &mut result);
877        }
878    }
879    for join in &select.joins {
880        collect_source_table(&join.this, &mut result);
881    }
882    result
883}
884
885fn collect_source_table(expr: &Expression, result: &mut Vec<(String, String)>) {
886    match expr {
887        Expression::Table(table) => {
888            let qname = table_ref_qualified_name(table);
889            let alias = table
890                .alias
891                .as_ref()
892                .map(|a| a.name.clone())
893                .unwrap_or_else(|| table.name.name.clone());
894            result.push((alias, qname));
895        }
896        Expression::Alias(alias) => collect_source_table(&alias.this, result),
897        Expression::Paren(paren) => collect_source_table(&paren.this, result),
898        _ => {}
899    }
900}
901
902fn leftmost_select(expr: &Expression) -> Option<&Select> {
903    match expr {
904        Expression::Select(select) => Some(select),
905        Expression::Union(union) => leftmost_select(&union.left),
906        Expression::Intersect(intersect) => leftmost_select(&intersect.left),
907        Expression::Except(except) => leftmost_select(&except.left),
908        Expression::Subquery(subquery) => leftmost_select(&subquery.this),
909        _ => None,
910    }
911}
912
913fn output_name(expr: &Expression) -> Option<String> {
914    match expr {
915        Expression::Alias(alias) => Some(alias.alias.name.clone()),
916        Expression::Column(col) => Some(col.name.name.clone()),
917        Expression::Identifier(id) => Some(id.name.clone()),
918        Expression::Annotated(a) => output_name(&a.this),
919        _ => None,
920    }
921}
922
923fn unalias(expr: &Expression) -> &Expression {
924    match expr {
925        Expression::Alias(alias) => &alias.this,
926        Expression::Annotated(a) => unalias(&a.this),
927        _ => expr,
928    }
929}
930
931fn is_star_expr(expr: &Expression) -> bool {
932    matches!(expr, Expression::Star(_))
933        || matches!(expr, Expression::Column(col) if col.name.name == "*")
934}
935
936fn star_qualifier(expr: &Expression) -> Option<String> {
937    match expr {
938        Expression::Star(star) => star.table.as_ref().map(|t| t.name.clone()),
939        Expression::Column(col) if col.name.name == "*" => {
940            col.table.as_ref().map(|t| t.name.clone())
941        }
942        _ => None,
943    }
944}
945
946fn dataset_from_expression(
947    expr: &Expression,
948    options: &OpenLineageOptions,
949) -> Result<OpenLineageDatasetId> {
950    match expr {
951        Expression::Table(table) => dataset_from_table_ref(table, options),
952        Expression::Identifier(id) => dataset_from_table_name(&id.name, options),
953        _ => Err(Error::unsupported(
954            "OpenLineage dataset extraction from non-table expression",
955            options.dialect.to_string(),
956        )),
957    }
958}
959
960fn dataset_from_table_ref(
961    table: &TableRef,
962    options: &OpenLineageOptions,
963) -> Result<OpenLineageDatasetId> {
964    dataset_from_table_name(&table_ref_qualified_name(table), options)
965}
966
967fn dataset_from_table_name(
968    table_name: &str,
969    options: &OpenLineageOptions,
970) -> Result<OpenLineageDatasetId> {
971    if let Some(mapped) = options.dataset_mappings.get(table_name) {
972        return Ok(mapped.clone());
973    }
974    let namespace = options.dataset_namespace.as_ref().ok_or_else(|| {
975        Error::parse(
976            format!(
977                "Missing datasetNamespace or explicit dataset mapping for table '{}'",
978                table_name
979            ),
980            0,
981            0,
982            0,
983            0,
984        )
985    })?;
986    Ok(OpenLineageDatasetId::new(namespace, table_name))
987}
988
989fn table_ref_qualified_name(table: &TableRef) -> String {
990    let mut parts = Vec::new();
991    if let Some(catalog) = &table.catalog {
992        parts.push(catalog.name.clone());
993    }
994    if let Some(schema) = &table.schema {
995        parts.push(schema.name.clone());
996    }
997    parts.push(table.name.name.clone());
998    parts.join(".")
999}
1000
1001fn collect_cte_aliases(expr: &Expression, dialect: DialectType) -> HashSet<String> {
1002    let mut aliases = HashSet::new();
1003    for node in expr.dfs() {
1004        match node {
1005            Expression::Select(select) => {
1006                if let Some(with) = &select.with {
1007                    collect_with_aliases(with, dialect, &mut aliases);
1008                }
1009            }
1010            Expression::Union(union) => {
1011                if let Some(with) = &union.with {
1012                    collect_with_aliases(with, dialect, &mut aliases);
1013                }
1014            }
1015            Expression::Intersect(intersect) => {
1016                if let Some(with) = &intersect.with {
1017                    collect_with_aliases(with, dialect, &mut aliases);
1018                }
1019            }
1020            Expression::Except(except) => {
1021                if let Some(with) = &except.with {
1022                    collect_with_aliases(with, dialect, &mut aliases);
1023                }
1024            }
1025            _ => {}
1026        }
1027    }
1028    aliases
1029}
1030
1031fn collect_with_aliases(with: &With, dialect: DialectType, aliases: &mut HashSet<String>) {
1032    for cte in &with.ctes {
1033        aliases.insert(normalize_identifier(&cte.alias.name, dialect, true));
1034    }
1035}
1036
1037fn normalize_identifier(name: &str, dialect: DialectType, is_table: bool) -> String {
1038    crate::schema::normalize_name(name, Some(dialect), is_table, true)
1039}
1040
1041fn openlineage_serialization_error(err: serde_json::Error) -> Error {
1042    Error::internal(format!("OpenLineage serialization failed: {err}"))
1043}
1044
1045fn deserialize_dialect_type<'de, D>(deserializer: D) -> std::result::Result<DialectType, D::Error>
1046where
1047    D: Deserializer<'de>,
1048{
1049    let value = String::deserialize(deserializer)?;
1050    value.parse::<DialectType>().map_err(de::Error::custom)
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055    use super::*;
1056
1057    fn options() -> OpenLineageOptions {
1058        OpenLineageOptions {
1059            dialect: DialectType::PostgreSQL,
1060            producer: "https://github.com/tobilg/polyglot".to_string(),
1061            dataset_namespace: Some("postgres://warehouse".to_string()),
1062            output_dataset: Some(OpenLineageDatasetId::new(
1063                "postgres://warehouse",
1064                "analytics.out",
1065            )),
1066            job_namespace: Some("polyglot-tests".to_string()),
1067            job_name: Some("lineage-test".to_string()),
1068            event_time: Some("2026-05-18T00:00:00Z".to_string()),
1069            run_id: Some("3b452093-782c-4ef2-9c0c-aafe2aa6f34d".to_string()),
1070            event_type: Some(OpenLineageRunEventType::Complete),
1071            ..Default::default()
1072        }
1073    }
1074
1075    #[test]
1076    fn deserializes_dialect_aliases_in_options() {
1077        let options: OpenLineageOptions =
1078            serde_json::from_str(r#"{"producer":"polyglot","dialect":"postgres"}"#)
1079                .expect("options");
1080        assert_eq!(options.dialect, DialectType::PostgreSQL);
1081    }
1082
1083    #[test]
1084    fn emits_identity_column_lineage_for_select() {
1085        let result = openlineage_column_lineage("SELECT a FROM t", &options()).expect("lineage");
1086        let field = result.facet.fields.get("a").expect("field a");
1087        assert_eq!(field.input_fields.len(), 1);
1088        assert_eq!(field.input_fields[0].name, "t");
1089        assert_eq!(field.input_fields[0].field, "a");
1090        assert_eq!(field.input_fields[0].transformations[0].subtype, "IDENTITY");
1091    }
1092
1093    #[test]
1094    fn resolves_input_dataset_behind_table_alias() {
1095        let result = openlineage_column_lineage("SELECT o.total FROM orders o", &options())
1096            .expect("lineage");
1097        let field = result.facet.fields.get("total").expect("field total");
1098        assert_eq!(field.input_fields[0].name, "orders");
1099        assert_eq!(field.input_fields[0].field, "total");
1100    }
1101
1102    #[test]
1103    fn emits_transformation_column_lineage_for_expression() {
1104        let result =
1105            openlineage_column_lineage("SELECT a + b AS c FROM t", &options()).expect("lineage");
1106        let field = result.facet.fields.get("c").expect("field c");
1107        assert_eq!(field.input_fields.len(), 2);
1108        assert!(field.input_fields.iter().any(|f| f.field == "a"));
1109        assert!(field.input_fields.iter().any(|f| f.field == "b"));
1110        assert!(field
1111            .input_fields
1112            .iter()
1113            .all(|f| f.transformations[0].subtype == "TRANSFORMATION"));
1114    }
1115
1116    #[test]
1117    fn emits_aggregation_column_lineage() {
1118        let result =
1119            openlineage_column_lineage("SELECT SUM(amount) AS total FROM orders", &options())
1120                .expect("lineage");
1121        let field = result.facet.fields.get("total").expect("field total");
1122        assert_eq!(field.input_fields[0].field, "amount");
1123        assert_eq!(
1124            field.input_fields[0].transformations[0].subtype,
1125            "AGGREGATION"
1126        );
1127    }
1128
1129    #[test]
1130    fn infers_insert_output_dataset() {
1131        let mut opts = options();
1132        opts.output_dataset = None;
1133        let result =
1134            openlineage_column_lineage("INSERT INTO analytics.out SELECT a FROM raw.input", &opts)
1135                .expect("lineage");
1136        assert_eq!(result.outputs[0].name, "analytics.out");
1137        assert_eq!(result.inputs[0].name, "raw.input");
1138    }
1139
1140    #[test]
1141    fn maps_insert_target_columns_to_output_fields() {
1142        let mut opts = options();
1143        opts.output_dataset = None;
1144        let result = openlineage_column_lineage(
1145            "INSERT INTO analytics.out (target_a) SELECT source_a FROM raw.input",
1146            &opts,
1147        )
1148        .expect("lineage");
1149        let field = result.facet.fields.get("target_a").expect("target field");
1150        assert_eq!(field.input_fields[0].field, "source_a");
1151        assert!(!result.facet.fields.contains_key("source_a"));
1152    }
1153
1154    #[test]
1155    fn pure_select_requires_output_dataset() {
1156        let mut opts = options();
1157        opts.output_dataset = None;
1158        let err = openlineage_column_lineage("SELECT a FROM t", &opts).unwrap_err();
1159        assert!(err.to_string().contains("outputDataset is required"));
1160    }
1161
1162    #[test]
1163    fn emits_job_event_payload() {
1164        let result = openlineage_job_event("SELECT a FROM t", &options()).expect("event");
1165        assert_eq!(result.event["job"]["namespace"], "polyglot-tests");
1166        assert_eq!(
1167            result.event["job"]["facets"]["sql"]["_schemaURL"],
1168            SQL_JOB_FACET_SCHEMA_URL
1169        );
1170        assert_eq!(
1171            result.event["outputs"][0]["facets"]["columnLineage"]["fields"]["a"]["inputFields"][0]
1172                ["field"],
1173            "a"
1174        );
1175    }
1176
1177    #[test]
1178    fn emits_run_event_payload() {
1179        let result = openlineage_run_event("SELECT a FROM t", &options()).expect("event");
1180        assert_eq!(result.event["eventType"], "COMPLETE");
1181        assert_eq!(
1182            result.event["run"]["runId"],
1183            "3b452093-782c-4ef2-9c0c-aafe2aa6f34d"
1184        );
1185    }
1186
1187    #[test]
1188    fn select_star_without_schema_warns() {
1189        let result = openlineage_column_lineage("SELECT * FROM t", &options()).expect("lineage");
1190        assert!(result.facet.fields.is_empty());
1191        assert!(result
1192            .warnings
1193            .iter()
1194            .any(|w| w.code == "W_STAR_WITHOUT_SCHEMA"));
1195    }
1196
1197    #[test]
1198    fn select_star_with_schema_expands_fields() {
1199        let mut opts = options();
1200        opts.schema = Some(ValidationSchema {
1201            strict: None,
1202            tables: vec![crate::validation::SchemaTable {
1203                name: "t".to_string(),
1204                schema: None,
1205                columns: vec![
1206                    crate::validation::SchemaColumn {
1207                        name: "a".to_string(),
1208                        data_type: "INT".to_string(),
1209                        nullable: None,
1210                        primary_key: false,
1211                        unique: false,
1212                        references: None,
1213                    },
1214                    crate::validation::SchemaColumn {
1215                        name: "b".to_string(),
1216                        data_type: "TEXT".to_string(),
1217                        nullable: None,
1218                        primary_key: false,
1219                        unique: false,
1220                        references: None,
1221                    },
1222                ],
1223                aliases: vec![],
1224                primary_key: vec![],
1225                unique_keys: vec![],
1226                foreign_keys: vec![],
1227            }],
1228        });
1229
1230        let result = openlineage_column_lineage("SELECT * FROM t", &opts).expect("lineage");
1231        assert!(result.facet.fields.contains_key("a"));
1232        assert!(result.facet.fields.contains_key("b"));
1233    }
1234}