Skip to main content

polyglot_sql/
openlineage.rs

1//! OpenLineage-compatible payload generation.
2//!
3//! This module only builds OpenLineage JSON-compatible structures from SQL
4//! analysis. It deliberately does not implement transports, clients, retries,
5//! buffering, or runtime lifecycle management.
6
7use crate::dialects::{Dialect, DialectType};
8use crate::expressions::*;
9use crate::lineage::{self, LineageNode};
10use crate::schema::Schema;
11use crate::traversal::ExpressionWalk;
12use crate::{mapping_schema_from_validation_schema, Error, Result, ValidationSchema};
13use serde::de::{self, Deserializer};
14use serde::{Deserialize, Serialize};
15use serde_json::{json, Value};
16use std::collections::{BTreeMap, BTreeSet, HashSet};
17
18pub const OPENLINEAGE_SCHEMA_URL: &str = "https://openlineage.io/spec/2-0-2/OpenLineage.json";
19pub const COLUMN_LINEAGE_FACET_SCHEMA_URL: &str =
20    "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json";
21pub const SQL_JOB_FACET_SCHEMA_URL: &str =
22    "https://openlineage.io/spec/facets/1-1-0/SQLJobFacet.json";
23pub const JOB_TYPE_JOB_FACET_SCHEMA_URL: &str =
24    "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json";
25pub const SCHEMA_DATASET_FACET_SCHEMA_URL: &str =
26    "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json";
27
28/// Dataset identity in OpenLineage (`namespace`, `name`).
29#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct OpenLineageDatasetId {
32    pub namespace: String,
33    pub name: String,
34}
35
36impl OpenLineageDatasetId {
37    pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
38        Self {
39            namespace: namespace.into(),
40            name: name.into(),
41        }
42    }
43}
44
45/// Options shared by OpenLineage payload generation helpers.
46#[derive(Debug, Clone, Serialize, Deserialize, Default)]
47#[serde(rename_all = "camelCase", default)]
48pub struct OpenLineageOptions {
49    #[serde(deserialize_with = "deserialize_dialect_type")]
50    pub dialect: DialectType,
51    pub producer: String,
52    pub dataset_namespace: Option<String>,
53    pub dataset_mappings: BTreeMap<String, OpenLineageDatasetId>,
54    pub output_dataset: Option<OpenLineageDatasetId>,
55    pub schema: Option<ValidationSchema>,
56    pub job_namespace: Option<String>,
57    pub job_name: Option<String>,
58    pub event_time: Option<String>,
59    pub run_id: Option<String>,
60    pub event_type: Option<OpenLineageRunEventType>,
61}
62
63/// OpenLineage run event type.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
66pub enum OpenLineageRunEventType {
67    Start,
68    Running,
69    Complete,
70    Abort,
71    Fail,
72    Other,
73}
74
75/// Non-fatal issue encountered while generating OpenLineage output.
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "camelCase")]
78pub struct OpenLineageWarning {
79    pub code: String,
80    pub message: String,
81}
82
83impl OpenLineageWarning {
84    fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
85        Self {
86            code: code.into(),
87            message: message.into(),
88        }
89    }
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93#[serde(rename_all = "camelCase")]
94pub struct OpenLineageColumnLineageResult {
95    pub facet: ColumnLineageDatasetFacet,
96    pub inputs: Vec<OpenLineageDataset>,
97    pub outputs: Vec<OpenLineageDataset>,
98    pub warnings: Vec<OpenLineageWarning>,
99}
100
101#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
102#[serde(rename_all = "camelCase")]
103pub struct OpenLineageEventResult {
104    pub event: Value,
105    pub warnings: Vec<OpenLineageWarning>,
106}
107
108#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
109pub struct OpenLineageDataset {
110    pub namespace: String,
111    pub name: String,
112    #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
113    pub facets: BTreeMap<String, Value>,
114}
115
116impl std::convert::From<OpenLineageDatasetId> for OpenLineageDataset {
117    fn from(id: OpenLineageDatasetId) -> Self {
118        Self {
119            namespace: id.namespace,
120            name: id.name,
121            facets: BTreeMap::new(),
122        }
123    }
124}
125
126#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127pub struct ColumnLineageDatasetFacet {
128    #[serde(rename = "_producer")]
129    pub producer: String,
130    #[serde(rename = "_schemaURL")]
131    pub schema_url: String,
132    pub fields: BTreeMap<String, ColumnLineageField>,
133}
134
135#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct ColumnLineageField {
138    pub input_fields: Vec<OpenLineageInputField>,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
142#[serde(rename_all = "camelCase")]
143pub struct OpenLineageInputField {
144    pub namespace: String,
145    pub name: String,
146    pub field: String,
147    #[serde(skip_serializing_if = "Vec::is_empty", default)]
148    pub transformations: Vec<OpenLineageTransformation>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
152pub struct OpenLineageTransformation {
153    #[serde(rename = "type")]
154    pub type_: String,
155    pub subtype: String,
156    #[serde(skip_serializing_if = "Option::is_none")]
157    pub description: Option<String>,
158    #[serde(skip_serializing_if = "Option::is_none")]
159    pub masking: Option<bool>,
160}
161
162#[derive(Debug, Clone)]
163struct StatementAnalysis {
164    query: Expression,
165    inputs: Vec<OpenLineageDatasetId>,
166    output: OpenLineageDatasetId,
167    output_column_names: Vec<String>,
168}
169
170#[derive(Debug, Clone)]
171struct OutputField {
172    name: String,
173    lineage_name: String,
174    expression: Option<Expression>,
175    star_source_table: Option<String>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
179struct TerminalField {
180    table: String,
181    field: String,
182}
183
184/// Produce a standalone OpenLineage columnLineage facet plus inferred datasets.
185pub fn openlineage_column_lineage(
186    sql: &str,
187    options: &OpenLineageOptions,
188) -> Result<OpenLineageColumnLineageResult> {
189    validate_common_options(options)?;
190
191    let mut warnings = Vec::new();
192    let schema_mapping = options
193        .schema
194        .as_ref()
195        .map(mapping_schema_from_validation_schema);
196    let dialect = Dialect::get(options.dialect);
197    let mut expressions = dialect.parse(sql)?;
198    if expressions.len() != 1 {
199        return Err(Error::parse(
200            format!(
201                "OpenLineage generation expects exactly one statement, found {}",
202                expressions.len()
203            ),
204            0,
205            0,
206            0,
207            0,
208        ));
209    }
210
211    let expr = expressions.remove(0);
212    let analysis = analyze_statement(&expr, options, &mut warnings)?;
213    let mut output_fields = output_fields_for_query(
214        &analysis.query,
215        schema_mapping.as_ref().map(|s| s as &dyn Schema),
216        options.dialect,
217        &mut warnings,
218    )?;
219    apply_output_column_names(
220        &mut output_fields,
221        &analysis.output_column_names,
222        &mut warnings,
223    );
224
225    let mut fields = BTreeMap::new();
226    for output_field in output_fields {
227        if fields.contains_key(&output_field.name) {
228            warnings.push(OpenLineageWarning::new(
229                "W_DUPLICATE_OUTPUT_FIELD",
230                format!(
231                    "Duplicate output field '{}' was merged in the OpenLineage fields map",
232                    output_field.name
233                ),
234            ));
235        }
236
237        let input_fields = input_fields_for_output(
238            &analysis.query,
239            &output_field,
240            options,
241            schema_mapping.as_ref().map(|s| s as &dyn Schema),
242            &mut warnings,
243        )?;
244
245        fields.insert(output_field.name, ColumnLineageField { input_fields });
246    }
247
248    let mut outputs = vec![OpenLineageDataset::from(analysis.output.clone())];
249    attach_output_facets(&mut outputs[0], &analysis.output, options, &fields)?;
250
251    Ok(OpenLineageColumnLineageResult {
252        facet: ColumnLineageDatasetFacet {
253            producer: options.producer.clone(),
254            schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
255            fields,
256        },
257        inputs: analysis
258            .inputs
259            .into_iter()
260            .map(OpenLineageDataset::from)
261            .collect(),
262        outputs,
263        warnings,
264    })
265}
266
267/// Produce an OpenLineage JobEvent as JSON.
268pub fn openlineage_job_event(
269    sql: &str,
270    options: &OpenLineageOptions,
271) -> Result<OpenLineageEventResult> {
272    let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
273    let job_name = required_option(&options.job_name, "jobName")?;
274    let event_time = required_option(&options.event_time, "eventTime")?;
275
276    let result = openlineage_column_lineage(sql, options)?;
277    let event = json!({
278        "eventTime": event_time,
279        "producer": options.producer,
280        "schemaURL": OPENLINEAGE_SCHEMA_URL,
281        "job": {
282            "namespace": job_namespace,
283            "name": job_name,
284            "facets": job_facets(sql, options),
285        },
286        "inputs": result.inputs,
287        "outputs": result.outputs,
288    });
289
290    Ok(OpenLineageEventResult {
291        event,
292        warnings: result.warnings,
293    })
294}
295
296/// Produce an OpenLineage RunEvent as JSON.
297pub fn openlineage_run_event(
298    sql: &str,
299    options: &OpenLineageOptions,
300) -> Result<OpenLineageEventResult> {
301    let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
302    let job_name = required_option(&options.job_name, "jobName")?;
303    let event_time = required_option(&options.event_time, "eventTime")?;
304    let run_id = required_option(&options.run_id, "runId")?;
305    let event_type = options
306        .event_type
307        .ok_or_else(|| Error::parse("Missing required option: eventType", 0, 0, 0, 0))?;
308
309    let result = openlineage_column_lineage(sql, options)?;
310    let event = json!({
311        "eventTime": event_time,
312        "eventType": event_type,
313        "producer": options.producer,
314        "schemaURL": OPENLINEAGE_SCHEMA_URL,
315        "run": {
316            "runId": run_id,
317            "facets": {},
318        },
319        "job": {
320            "namespace": job_namespace,
321            "name": job_name,
322            "facets": job_facets(sql, options),
323        },
324        "inputs": result.inputs,
325        "outputs": result.outputs,
326    });
327
328    Ok(OpenLineageEventResult {
329        event,
330        warnings: result.warnings,
331    })
332}
333
334fn validate_common_options(options: &OpenLineageOptions) -> Result<()> {
335    if options.producer.trim().is_empty() {
336        return Err(Error::parse(
337            "Missing required option: producer",
338            0,
339            0,
340            0,
341            0,
342        ));
343    }
344    Ok(())
345}
346
347fn required_option(value: &Option<String>, name: &str) -> Result<String> {
348    match value.as_ref().filter(|v| !v.trim().is_empty()) {
349        Some(value) => Ok(value.clone()),
350        None => Err(Error::parse(
351            format!("Missing required option: {name}"),
352            0,
353            0,
354            0,
355            0,
356        )),
357    }
358}
359
360fn analyze_statement(
361    expr: &Expression,
362    options: &OpenLineageOptions,
363    warnings: &mut Vec<OpenLineageWarning>,
364) -> Result<StatementAnalysis> {
365    match expr {
366        Expression::Select(select) => {
367            let output = if let Some(into) = &select.into {
368                dataset_from_expression(&into.this, options)?
369            } else {
370                options.output_dataset.clone().ok_or_else(|| {
371                    Error::parse(
372                        "OpenLineage outputDataset is required for SELECT statements without SELECT INTO",
373                        0,
374                        0,
375                        0,
376                        0,
377                    )
378                })?
379            };
380            Ok(StatementAnalysis {
381                query: expr.clone(),
382                inputs: collect_input_datasets(expr, options, Some(&output), warnings)?,
383                output,
384                output_column_names: Vec::new(),
385            })
386        }
387        Expression::Insert(insert) => {
388            let output = dataset_from_table_ref(&insert.table, options)?;
389            let query = insert.query.clone().ok_or_else(|| {
390                Error::unsupported(
391                    "OpenLineage column lineage for INSERT without query",
392                    options.dialect.to_string(),
393                )
394            })?;
395            Ok(StatementAnalysis {
396                inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
397                query,
398                output,
399                output_column_names: insert.columns.iter().map(|col| col.name.clone()).collect(),
400            })
401        }
402        Expression::CreateTable(create) => {
403            let output = dataset_from_table_ref(&create.name, options)?;
404            let query = create.as_select.clone().ok_or_else(|| {
405                Error::unsupported(
406                    "OpenLineage column lineage for CREATE TABLE without AS SELECT",
407                    options.dialect.to_string(),
408                )
409            })?;
410            Ok(StatementAnalysis {
411                inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
412                query,
413                output,
414                output_column_names: create
415                    .columns
416                    .iter()
417                    .map(|col| col.name.name.clone())
418                    .collect(),
419            })
420        }
421        _ => Err(Error::unsupported(
422            format!("OpenLineage generation for {}", expr.variant_name()),
423            options.dialect.to_string(),
424        )),
425    }
426}
427
428fn output_fields_for_query(
429    query: &Expression,
430    schema: Option<&dyn Schema>,
431    dialect: DialectType,
432    warnings: &mut Vec<OpenLineageWarning>,
433) -> Result<Vec<OutputField>> {
434    let select = leftmost_select(query).ok_or_else(|| {
435        Error::unsupported(
436            "OpenLineage output field extraction for non-SELECT query",
437            dialect.to_string(),
438        )
439    })?;
440
441    let mut fields = Vec::new();
442    for (idx, expr) in select.expressions.iter().enumerate() {
443        if is_star_expr(expr) {
444            expand_star_output_fields(select, expr, schema, warnings, &mut fields);
445            continue;
446        }
447
448        let name = output_name(expr).unwrap_or_else(|| format!("_{idx}"));
449        fields.push(OutputField {
450            lineage_name: name.clone(),
451            name,
452            expression: Some(expr.clone()),
453            star_source_table: None,
454        });
455    }
456    Ok(fields)
457}
458
459fn apply_output_column_names(
460    fields: &mut [OutputField],
461    output_column_names: &[String],
462    warnings: &mut Vec<OpenLineageWarning>,
463) {
464    if output_column_names.is_empty() {
465        return;
466    }
467    if output_column_names.len() != fields.len() {
468        warnings.push(OpenLineageWarning::new(
469            "W_OUTPUT_COLUMN_COUNT_MISMATCH",
470            format!(
471                "Target column count ({}) does not match projected column count ({})",
472                output_column_names.len(),
473                fields.len()
474            ),
475        ));
476        return;
477    }
478    for (field, output_name) in fields.iter_mut().zip(output_column_names) {
479        field.name = output_name.clone();
480    }
481}
482
483fn input_fields_for_output(
484    query: &Expression,
485    output_field: &OutputField,
486    options: &OpenLineageOptions,
487    schema: Option<&dyn Schema>,
488    warnings: &mut Vec<OpenLineageWarning>,
489) -> Result<Vec<OpenLineageInputField>> {
490    if let Some(table) = &output_field.star_source_table {
491        return terminal_fields_to_openlineage(
492            vec![TerminalField {
493                table: table.clone(),
494                field: output_field.lineage_name.clone(),
495            }],
496            "IDENTITY",
497            Some(format!("SELECT {}", output_field.lineage_name)),
498            options,
499            warnings,
500        );
501    }
502
503    let lineage_result = if let Some(schema) = schema {
504        lineage::lineage_with_schema(
505            &output_field.lineage_name,
506            query,
507            Some(schema),
508            Some(options.dialect),
509            false,
510        )
511    } else {
512        lineage::lineage(
513            &output_field.lineage_name,
514            query,
515            Some(options.dialect),
516            false,
517        )
518    };
519
520    let node = match lineage_result {
521        Ok(node) => node,
522        Err(err) => {
523            warnings.push(OpenLineageWarning::new(
524                "W_UNRESOLVED_OUTPUT_FIELD",
525                format!(
526                    "Could not resolve lineage for output field '{}': {}",
527                    output_field.name, err
528                ),
529            ));
530            return Ok(Vec::new());
531        }
532    };
533
534    let mut terminals = BTreeSet::new();
535    collect_terminal_fields(&node, &mut terminals);
536    let terminals: Vec<TerminalField> = terminals.into_iter().collect();
537
538    if terminals.is_empty() {
539        warnings.push(OpenLineageWarning::new(
540            "W_EMPTY_FIELD_LINEAGE",
541            format!(
542                "No input fields were found for output field '{}'",
543                output_field.name
544            ),
545        ));
546        return Ok(Vec::new());
547    }
548
549    let subtype = transformation_subtype(output_field.expression.as_ref(), &terminals);
550    let description = output_field
551        .expression
552        .as_ref()
553        .and_then(|expr| transformation_description(expr, options.dialect));
554
555    terminal_fields_to_openlineage(terminals, subtype, description, options, warnings)
556}
557
558fn transformation_description(expr: &Expression, dialect: DialectType) -> Option<String> {
559    #[cfg(feature = "generate")]
560    {
561        Some(expr.sql_for(dialect))
562    }
563
564    #[cfg(not(feature = "generate"))]
565    {
566        let _ = (expr, dialect);
567        None
568    }
569}
570
571fn terminal_fields_to_openlineage(
572    terminals: Vec<TerminalField>,
573    subtype: &str,
574    description: Option<String>,
575    options: &OpenLineageOptions,
576    warnings: &mut Vec<OpenLineageWarning>,
577) -> Result<Vec<OpenLineageInputField>> {
578    let mut result = Vec::new();
579    for terminal in terminals {
580        let dataset = dataset_from_table_name(&terminal.table, options).map_err(|err| {
581            warnings.push(OpenLineageWarning::new(
582                "W_UNRESOLVED_DATASET",
583                format!(
584                    "Could not map table '{}' to an OpenLineage dataset: {}",
585                    terminal.table, err
586                ),
587            ));
588            err
589        })?;
590        result.push(OpenLineageInputField {
591            namespace: dataset.namespace,
592            name: dataset.name,
593            field: terminal.field,
594            transformations: vec![OpenLineageTransformation {
595                type_: "DIRECT".to_string(),
596                subtype: subtype.to_string(),
597                description: description.clone(),
598                masking: Some(false),
599            }],
600        });
601    }
602    Ok(result)
603}
604
605fn transformation_subtype(expr: Option<&Expression>, terminals: &[TerminalField]) -> &'static str {
606    let Some(expr) = expr else {
607        return "TRANSFORMATION";
608    };
609    let unaliased = unalias(expr);
610    if expression_contains_aggregate(unaliased) {
611        return "AGGREGATION";
612    }
613    if terminals.len() == 1 {
614        if let Expression::Column(col) = unaliased {
615            if col.name.name == terminals[0].field {
616                return "IDENTITY";
617            }
618        }
619    }
620    "TRANSFORMATION"
621}
622
623fn collect_terminal_fields(node: &LineageNode, terminals: &mut BTreeSet<TerminalField>) {
624    if node.downstream.is_empty() {
625        if let Expression::Column(column) = &node.expression {
626            let table = if !node.source_name.is_empty() {
627                Some(node.source_name.clone())
628            } else if let Expression::Table(table) = &node.source {
629                Some(table_ref_qualified_name(table))
630            } else {
631                column.table.as_ref().map(|t| t.name.clone())
632            };
633            if let Some(table) = table.filter(|t| !t.is_empty()) {
634                terminals.insert(TerminalField {
635                    table,
636                    field: column.name.name.clone(),
637                });
638            }
639        }
640        return;
641    }
642
643    for child in &node.downstream {
644        collect_terminal_fields(child, terminals);
645    }
646}
647
648fn expression_contains_aggregate(expr: &Expression) -> bool {
649    expr.contains(|node| {
650        matches!(
651            node,
652            Expression::AggregateFunction(_)
653                | Expression::Sum(_)
654                | Expression::Count(_)
655                | Expression::Avg(_)
656                | Expression::Min(_)
657                | Expression::Max(_)
658                | Expression::GroupConcat(_)
659                | Expression::StringAgg(_)
660                | Expression::ListAgg(_)
661                | Expression::ArrayAgg(_)
662                | Expression::CountIf(_)
663                | Expression::SumIf(_)
664                | Expression::Stddev(_)
665                | Expression::StddevPop(_)
666                | Expression::StddevSamp(_)
667                | Expression::Variance(_)
668                | Expression::VarPop(_)
669                | Expression::VarSamp(_)
670                | Expression::Median(_)
671                | Expression::Mode(_)
672                | Expression::First(_)
673                | Expression::Last(_)
674                | Expression::AnyValue(_)
675                | Expression::ApproxDistinct(_)
676                | Expression::ApproxCountDistinct(_)
677                | Expression::ApproxPercentile(_)
678                | Expression::Percentile(_)
679                | Expression::LogicalAnd(_)
680                | Expression::LogicalOr(_)
681                | Expression::Skewness(_)
682                | Expression::BitwiseCount(_)
683                | Expression::ArrayConcatAgg(_)
684                | Expression::ArrayUniqueAgg(_)
685                | Expression::BoolXorAgg(_)
686                | Expression::ParameterizedAgg(_)
687                | Expression::ArgMax(_)
688                | Expression::ArgMin(_)
689                | Expression::ApproxTopK(_)
690                | Expression::ApproxTopKAccumulate(_)
691                | Expression::ApproxTopKCombine(_)
692                | Expression::ApproxTopKEstimate(_)
693                | Expression::ApproxTopSum(_)
694                | Expression::ApproxQuantiles(_)
695                | Expression::Grouping(_)
696                | Expression::GroupingId(_)
697                | Expression::AnonymousAggFunc(_)
698                | Expression::CombinedAggFunc(_)
699                | Expression::CombinedParameterizedAgg(_)
700                | Expression::HashAgg(_)
701                | Expression::ObjectAgg(_)
702                | Expression::AIAgg(_)
703        )
704    })
705}
706
707fn collect_input_datasets(
708    expr: &Expression,
709    options: &OpenLineageOptions,
710    output: Option<&OpenLineageDatasetId>,
711    warnings: &mut Vec<OpenLineageWarning>,
712) -> Result<Vec<OpenLineageDatasetId>> {
713    let cte_aliases = collect_cte_aliases(expr, options.dialect);
714    let mut seen = BTreeSet::new();
715    let mut result = Vec::new();
716
717    for table in expr.dfs().filter_map(|node| match node {
718        Expression::Table(table) => Some(table),
719        _ => None,
720    }) {
721        let qname = table_ref_qualified_name(table);
722        let normalized = normalize_identifier(&table.name.name, options.dialect, true);
723        if cte_aliases.contains(&normalized) {
724            continue;
725        }
726        if output
727            .map(|out| out.name == qname || out.name == table.name.name)
728            .unwrap_or(false)
729        {
730            continue;
731        }
732        match dataset_from_table_name(&qname, options) {
733            Ok(dataset) => {
734                if seen.insert((dataset.namespace.clone(), dataset.name.clone())) {
735                    result.push(dataset);
736                }
737            }
738            Err(err) => warnings.push(OpenLineageWarning::new(
739                "W_UNRESOLVED_DATASET",
740                format!("Could not map input table '{qname}': {err}"),
741            )),
742        }
743    }
744
745    Ok(result)
746}
747
748fn attach_output_facets(
749    output: &mut OpenLineageDataset,
750    output_id: &OpenLineageDatasetId,
751    options: &OpenLineageOptions,
752    fields: &BTreeMap<String, ColumnLineageField>,
753) -> Result<()> {
754    let column_lineage = ColumnLineageDatasetFacet {
755        producer: options.producer.clone(),
756        schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
757        fields: fields.clone(),
758    };
759    output.facets.insert(
760        "columnLineage".to_string(),
761        serde_json::to_value(column_lineage).map_err(openlineage_serialization_error)?,
762    );
763
764    if let Some(schema_facet) = schema_facet_for_dataset(output_id, options) {
765        output.facets.insert(
766            "schema".to_string(),
767            serde_json::to_value(schema_facet).map_err(openlineage_serialization_error)?,
768        );
769    }
770
771    Ok(())
772}
773
774fn job_facets(sql: &str, options: &OpenLineageOptions) -> Value {
775    json!({
776        "sql": {
777            "_producer": options.producer,
778            "_schemaURL": SQL_JOB_FACET_SCHEMA_URL,
779            "query": sql,
780            "dialect": options.dialect.to_string(),
781        },
782        "jobType": {
783            "_producer": options.producer,
784            "_schemaURL": JOB_TYPE_JOB_FACET_SCHEMA_URL,
785            "processingType": "BATCH",
786            "integration": "POLYGLOT_SQL",
787            "jobType": "QUERY",
788        }
789    })
790}
791
792#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
793struct SchemaDatasetFacet {
794    #[serde(rename = "_producer")]
795    producer: String,
796    #[serde(rename = "_schemaURL")]
797    schema_url: String,
798    fields: Vec<SchemaDatasetFacetField>,
799}
800
801#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
802struct SchemaDatasetFacetField {
803    name: String,
804    #[serde(skip_serializing_if = "String::is_empty", default)]
805    #[serde(rename = "type")]
806    data_type: String,
807    #[serde(skip_serializing_if = "Option::is_none")]
808    ordinal_position: Option<usize>,
809}
810
811fn schema_facet_for_dataset(
812    output: &OpenLineageDatasetId,
813    options: &OpenLineageOptions,
814) -> Option<SchemaDatasetFacet> {
815    let schema = options.schema.as_ref()?;
816    let table = schema.tables.iter().find(|table| {
817        let qname = if let Some(schema_name) = &table.schema {
818            format!("{}.{}", schema_name, table.name)
819        } else {
820            table.name.clone()
821        };
822        output.name == table.name || output.name == qname
823    })?;
824
825    Some(SchemaDatasetFacet {
826        producer: options.producer.clone(),
827        schema_url: SCHEMA_DATASET_FACET_SCHEMA_URL.to_string(),
828        fields: table
829            .columns
830            .iter()
831            .enumerate()
832            .map(|(idx, col)| SchemaDatasetFacetField {
833                name: col.name.clone(),
834                data_type: col.data_type.clone(),
835                ordinal_position: Some(idx + 1),
836            })
837            .collect(),
838    })
839}
840
841fn expand_star_output_fields(
842    select: &Select,
843    star_expr: &Expression,
844    schema: Option<&dyn Schema>,
845    warnings: &mut Vec<OpenLineageWarning>,
846    fields: &mut Vec<OutputField>,
847) {
848    let Some(schema) = schema else {
849        warnings.push(OpenLineageWarning::new(
850            "W_STAR_WITHOUT_SCHEMA",
851            "SELECT * cannot be expanded into OpenLineage column lineage without schema metadata",
852        ));
853        return;
854    };
855
856    let qualifier = star_qualifier(star_expr);
857    let sources = select_source_tables(select);
858    for (alias, qname) in sources {
859        if qualifier
860            .as_ref()
861            .map(|q| q != &alias && q != &qname)
862            .unwrap_or(false)
863        {
864            continue;
865        }
866        match schema.column_names(&qname) {
867            Ok(columns) => {
868                for name in columns {
869                    fields.push(OutputField {
870                        lineage_name: name.clone(),
871                        name,
872                        expression: None,
873                        star_source_table: Some(qname.clone()),
874                    });
875                }
876            }
877            Err(err) => warnings.push(OpenLineageWarning::new(
878                "W_STAR_SCHEMA_LOOKUP_FAILED",
879                format!("Could not expand SELECT * for table '{}': {}", qname, err),
880            )),
881        }
882    }
883}
884
885fn select_source_tables(select: &Select) -> Vec<(String, String)> {
886    let mut result = Vec::new();
887    if let Some(from) = &select.from {
888        for expr in &from.expressions {
889            collect_source_table(expr, &mut result);
890        }
891    }
892    for join in &select.joins {
893        collect_source_table(&join.this, &mut result);
894    }
895    result
896}
897
898fn collect_source_table(expr: &Expression, result: &mut Vec<(String, String)>) {
899    match expr {
900        Expression::Table(table) => {
901            let qname = table_ref_qualified_name(table);
902            let alias = table
903                .alias
904                .as_ref()
905                .map(|a| a.name.clone())
906                .unwrap_or_else(|| table.name.name.clone());
907            result.push((alias, qname));
908        }
909        Expression::Alias(alias) => collect_source_table(&alias.this, result),
910        Expression::Paren(paren) => collect_source_table(&paren.this, result),
911        _ => {}
912    }
913}
914
915fn leftmost_select(expr: &Expression) -> Option<&Select> {
916    match expr {
917        Expression::Select(select) => Some(select),
918        Expression::Union(union) => leftmost_select(&union.left),
919        Expression::Intersect(intersect) => leftmost_select(&intersect.left),
920        Expression::Except(except) => leftmost_select(&except.left),
921        Expression::Subquery(subquery) => leftmost_select(&subquery.this),
922        _ => None,
923    }
924}
925
926fn output_name(expr: &Expression) -> Option<String> {
927    match expr {
928        Expression::Alias(alias) => Some(alias.alias.name.clone()),
929        Expression::Column(col) => Some(col.name.name.clone()),
930        Expression::Identifier(id) => Some(id.name.clone()),
931        Expression::Annotated(a) => output_name(&a.this),
932        _ => None,
933    }
934}
935
936fn unalias(expr: &Expression) -> &Expression {
937    match expr {
938        Expression::Alias(alias) => &alias.this,
939        Expression::Annotated(a) => unalias(&a.this),
940        _ => expr,
941    }
942}
943
944fn is_star_expr(expr: &Expression) -> bool {
945    matches!(expr, Expression::Star(_))
946        || matches!(expr, Expression::Column(col) if col.name.name == "*")
947}
948
949fn star_qualifier(expr: &Expression) -> Option<String> {
950    match expr {
951        Expression::Star(star) => star.table.as_ref().map(|t| t.name.clone()),
952        Expression::Column(col) if col.name.name == "*" => {
953            col.table.as_ref().map(|t| t.name.clone())
954        }
955        _ => None,
956    }
957}
958
959fn dataset_from_expression(
960    expr: &Expression,
961    options: &OpenLineageOptions,
962) -> Result<OpenLineageDatasetId> {
963    match expr {
964        Expression::Table(table) => dataset_from_table_ref(table, options),
965        Expression::Identifier(id) => dataset_from_table_name(&id.name, options),
966        _ => Err(Error::unsupported(
967            "OpenLineage dataset extraction from non-table expression",
968            options.dialect.to_string(),
969        )),
970    }
971}
972
973fn dataset_from_table_ref(
974    table: &TableRef,
975    options: &OpenLineageOptions,
976) -> Result<OpenLineageDatasetId> {
977    dataset_from_table_name(&table_ref_qualified_name(table), options)
978}
979
980fn dataset_from_table_name(
981    table_name: &str,
982    options: &OpenLineageOptions,
983) -> Result<OpenLineageDatasetId> {
984    if let Some(mapped) = options.dataset_mappings.get(table_name) {
985        return Ok(mapped.clone());
986    }
987    let namespace = options.dataset_namespace.as_ref().ok_or_else(|| {
988        Error::parse(
989            format!(
990                "Missing datasetNamespace or explicit dataset mapping for table '{}'",
991                table_name
992            ),
993            0,
994            0,
995            0,
996            0,
997        )
998    })?;
999    Ok(OpenLineageDatasetId::new(namespace, table_name))
1000}
1001
1002fn table_ref_qualified_name(table: &TableRef) -> String {
1003    let mut parts = Vec::new();
1004    if let Some(catalog) = &table.catalog {
1005        parts.push(catalog.name.clone());
1006    }
1007    if let Some(schema) = &table.schema {
1008        parts.push(schema.name.clone());
1009    }
1010    parts.push(table.name.name.clone());
1011    parts.join(".")
1012}
1013
1014fn collect_cte_aliases(expr: &Expression, dialect: DialectType) -> HashSet<String> {
1015    let mut aliases = HashSet::new();
1016    for node in expr.dfs() {
1017        match node {
1018            Expression::Select(select) => {
1019                if let Some(with) = &select.with {
1020                    collect_with_aliases(with, dialect, &mut aliases);
1021                }
1022            }
1023            Expression::Union(union) => {
1024                if let Some(with) = &union.with {
1025                    collect_with_aliases(with, dialect, &mut aliases);
1026                }
1027            }
1028            Expression::Intersect(intersect) => {
1029                if let Some(with) = &intersect.with {
1030                    collect_with_aliases(with, dialect, &mut aliases);
1031                }
1032            }
1033            Expression::Except(except) => {
1034                if let Some(with) = &except.with {
1035                    collect_with_aliases(with, dialect, &mut aliases);
1036                }
1037            }
1038            _ => {}
1039        }
1040    }
1041    aliases
1042}
1043
1044fn collect_with_aliases(with: &With, dialect: DialectType, aliases: &mut HashSet<String>) {
1045    for cte in &with.ctes {
1046        aliases.insert(normalize_identifier(&cte.alias.name, dialect, true));
1047    }
1048}
1049
1050fn normalize_identifier(name: &str, dialect: DialectType, is_table: bool) -> String {
1051    crate::schema::normalize_name(name, Some(dialect), is_table, true)
1052}
1053
1054fn openlineage_serialization_error(err: serde_json::Error) -> Error {
1055    Error::internal(format!("OpenLineage serialization failed: {err}"))
1056}
1057
1058fn deserialize_dialect_type<'de, D>(deserializer: D) -> std::result::Result<DialectType, D::Error>
1059where
1060    D: Deserializer<'de>,
1061{
1062    let value = String::deserialize(deserializer)?;
1063    value.parse::<DialectType>().map_err(de::Error::custom)
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068    use super::*;
1069
1070    fn options() -> OpenLineageOptions {
1071        OpenLineageOptions {
1072            dialect: DialectType::PostgreSQL,
1073            producer: "https://github.com/tobilg/polyglot".to_string(),
1074            dataset_namespace: Some("postgres://warehouse".to_string()),
1075            output_dataset: Some(OpenLineageDatasetId::new(
1076                "postgres://warehouse",
1077                "analytics.out",
1078            )),
1079            job_namespace: Some("polyglot-tests".to_string()),
1080            job_name: Some("lineage-test".to_string()),
1081            event_time: Some("2026-05-18T00:00:00Z".to_string()),
1082            run_id: Some("3b452093-782c-4ef2-9c0c-aafe2aa6f34d".to_string()),
1083            event_type: Some(OpenLineageRunEventType::Complete),
1084            ..Default::default()
1085        }
1086    }
1087
1088    #[test]
1089    fn deserializes_dialect_aliases_in_options() {
1090        let options: OpenLineageOptions =
1091            serde_json::from_str(r#"{"producer":"polyglot","dialect":"postgres"}"#)
1092                .expect("options");
1093        assert_eq!(options.dialect, DialectType::PostgreSQL);
1094    }
1095
1096    #[test]
1097    fn emits_identity_column_lineage_for_select() {
1098        let result = openlineage_column_lineage("SELECT a FROM t", &options()).expect("lineage");
1099        let field = result.facet.fields.get("a").expect("field a");
1100        assert_eq!(field.input_fields.len(), 1);
1101        assert_eq!(field.input_fields[0].name, "t");
1102        assert_eq!(field.input_fields[0].field, "a");
1103        assert_eq!(field.input_fields[0].transformations[0].subtype, "IDENTITY");
1104    }
1105
1106    #[test]
1107    fn resolves_input_dataset_behind_table_alias() {
1108        let result = openlineage_column_lineage("SELECT o.total FROM orders o", &options())
1109            .expect("lineage");
1110        let field = result.facet.fields.get("total").expect("field total");
1111        assert_eq!(field.input_fields[0].name, "orders");
1112        assert_eq!(field.input_fields[0].field, "total");
1113    }
1114
1115    #[test]
1116    fn emits_transformation_column_lineage_for_expression() {
1117        let result =
1118            openlineage_column_lineage("SELECT a + b AS c FROM t", &options()).expect("lineage");
1119        let field = result.facet.fields.get("c").expect("field c");
1120        assert_eq!(field.input_fields.len(), 2);
1121        assert!(field.input_fields.iter().any(|f| f.field == "a"));
1122        assert!(field.input_fields.iter().any(|f| f.field == "b"));
1123        assert!(field
1124            .input_fields
1125            .iter()
1126            .all(|f| f.transformations[0].subtype == "TRANSFORMATION"));
1127    }
1128
1129    #[test]
1130    fn emits_aggregation_column_lineage() {
1131        let result =
1132            openlineage_column_lineage("SELECT SUM(amount) AS total FROM orders", &options())
1133                .expect("lineage");
1134        let field = result.facet.fields.get("total").expect("field total");
1135        assert_eq!(field.input_fields[0].field, "amount");
1136        assert_eq!(
1137            field.input_fields[0].transformations[0].subtype,
1138            "AGGREGATION"
1139        );
1140    }
1141
1142    #[test]
1143    fn infers_insert_output_dataset() {
1144        let mut opts = options();
1145        opts.output_dataset = None;
1146        let result =
1147            openlineage_column_lineage("INSERT INTO analytics.out SELECT a FROM raw.input", &opts)
1148                .expect("lineage");
1149        assert_eq!(result.outputs[0].name, "analytics.out");
1150        assert_eq!(result.inputs[0].name, "raw.input");
1151    }
1152
1153    #[test]
1154    fn maps_insert_target_columns_to_output_fields() {
1155        let mut opts = options();
1156        opts.output_dataset = None;
1157        let result = openlineage_column_lineage(
1158            "INSERT INTO analytics.out (target_a) SELECT source_a FROM raw.input",
1159            &opts,
1160        )
1161        .expect("lineage");
1162        let field = result.facet.fields.get("target_a").expect("target field");
1163        assert_eq!(field.input_fields[0].field, "source_a");
1164        assert!(!result.facet.fields.contains_key("source_a"));
1165    }
1166
1167    #[test]
1168    fn pure_select_requires_output_dataset() {
1169        let mut opts = options();
1170        opts.output_dataset = None;
1171        let err = openlineage_column_lineage("SELECT a FROM t", &opts).unwrap_err();
1172        assert!(err.to_string().contains("outputDataset is required"));
1173    }
1174
1175    #[test]
1176    fn emits_job_event_payload() {
1177        let result = openlineage_job_event("SELECT a FROM t", &options()).expect("event");
1178        assert_eq!(result.event["job"]["namespace"], "polyglot-tests");
1179        assert_eq!(
1180            result.event["job"]["facets"]["sql"]["_schemaURL"],
1181            SQL_JOB_FACET_SCHEMA_URL
1182        );
1183        assert_eq!(
1184            result.event["outputs"][0]["facets"]["columnLineage"]["fields"]["a"]["inputFields"][0]
1185                ["field"],
1186            "a"
1187        );
1188    }
1189
1190    #[test]
1191    fn emits_run_event_payload() {
1192        let result = openlineage_run_event("SELECT a FROM t", &options()).expect("event");
1193        assert_eq!(result.event["eventType"], "COMPLETE");
1194        assert_eq!(
1195            result.event["run"]["runId"],
1196            "3b452093-782c-4ef2-9c0c-aafe2aa6f34d"
1197        );
1198    }
1199
1200    #[test]
1201    fn select_star_without_schema_warns() {
1202        let result = openlineage_column_lineage("SELECT * FROM t", &options()).expect("lineage");
1203        assert!(result.facet.fields.is_empty());
1204        assert!(result
1205            .warnings
1206            .iter()
1207            .any(|w| w.code == "W_STAR_WITHOUT_SCHEMA"));
1208    }
1209
1210    #[test]
1211    fn select_star_with_schema_expands_fields() {
1212        let mut opts = options();
1213        opts.schema = Some(ValidationSchema {
1214            strict: None,
1215            tables: vec![crate::validation::SchemaTable {
1216                name: "t".to_string(),
1217                schema: None,
1218                columns: vec![
1219                    crate::validation::SchemaColumn {
1220                        name: "a".to_string(),
1221                        data_type: "INT".to_string(),
1222                        nullable: None,
1223                        primary_key: false,
1224                        unique: false,
1225                        references: None,
1226                    },
1227                    crate::validation::SchemaColumn {
1228                        name: "b".to_string(),
1229                        data_type: "TEXT".to_string(),
1230                        nullable: None,
1231                        primary_key: false,
1232                        unique: false,
1233                        references: None,
1234                    },
1235                ],
1236                aliases: vec![],
1237                primary_key: vec![],
1238                unique_keys: vec![],
1239                foreign_keys: vec![],
1240            }],
1241        });
1242
1243        let result = openlineage_column_lineage("SELECT * FROM t", &opts).expect("lineage");
1244        assert!(result.facet.fields.contains_key("a"));
1245        assert!(result.facet.fields.contains_key("b"));
1246    }
1247}