1use crate::dialects::{Dialect, DialectType};
8use crate::expressions::*;
9use crate::lineage::{self, LineageNode};
10use crate::schema::Schema;
11use crate::traversal::ExpressionWalk;
12use crate::{mapping_schema_from_validation_schema, Error, Result, ValidationSchema};
13use serde::de::{self, Deserializer};
14use serde::{Deserialize, Serialize};
15use serde_json::{json, Value};
16use std::collections::{BTreeMap, BTreeSet, HashSet};
17
18pub const OPENLINEAGE_SCHEMA_URL: &str = "https://openlineage.io/spec/2-0-2/OpenLineage.json";
19pub const COLUMN_LINEAGE_FACET_SCHEMA_URL: &str =
20 "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json";
21pub const SQL_JOB_FACET_SCHEMA_URL: &str =
22 "https://openlineage.io/spec/facets/1-1-0/SQLJobFacet.json";
23pub const JOB_TYPE_JOB_FACET_SCHEMA_URL: &str =
24 "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json";
25pub const SCHEMA_DATASET_FACET_SCHEMA_URL: &str =
26 "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json";
27
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct OpenLineageDatasetId {
32 pub namespace: String,
33 pub name: String,
34}
35
36impl OpenLineageDatasetId {
37 pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
38 Self {
39 namespace: namespace.into(),
40 name: name.into(),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, Default)]
47#[serde(rename_all = "camelCase", default)]
48pub struct OpenLineageOptions {
49 #[serde(deserialize_with = "deserialize_dialect_type")]
50 pub dialect: DialectType,
51 pub producer: String,
52 pub dataset_namespace: Option<String>,
53 pub dataset_mappings: BTreeMap<String, OpenLineageDatasetId>,
54 pub output_dataset: Option<OpenLineageDatasetId>,
55 pub schema: Option<ValidationSchema>,
56 pub job_namespace: Option<String>,
57 pub job_name: Option<String>,
58 pub event_time: Option<String>,
59 pub run_id: Option<String>,
60 pub event_type: Option<OpenLineageRunEventType>,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
66pub enum OpenLineageRunEventType {
67 Start,
68 Running,
69 Complete,
70 Abort,
71 Fail,
72 Other,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "camelCase")]
78pub struct OpenLineageWarning {
79 pub code: String,
80 pub message: String,
81}
82
83impl OpenLineageWarning {
84 fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
85 Self {
86 code: code.into(),
87 message: message.into(),
88 }
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93#[serde(rename_all = "camelCase")]
94pub struct OpenLineageColumnLineageResult {
95 pub facet: ColumnLineageDatasetFacet,
96 pub inputs: Vec<OpenLineageDataset>,
97 pub outputs: Vec<OpenLineageDataset>,
98 pub warnings: Vec<OpenLineageWarning>,
99}
100
101#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
102#[serde(rename_all = "camelCase")]
103pub struct OpenLineageEventResult {
104 pub event: Value,
105 pub warnings: Vec<OpenLineageWarning>,
106}
107
108#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
109pub struct OpenLineageDataset {
110 pub namespace: String,
111 pub name: String,
112 #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
113 pub facets: BTreeMap<String, Value>,
114}
115
116impl std::convert::From<OpenLineageDatasetId> for OpenLineageDataset {
117 fn from(id: OpenLineageDatasetId) -> Self {
118 Self {
119 namespace: id.namespace,
120 name: id.name,
121 facets: BTreeMap::new(),
122 }
123 }
124}
125
126#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127pub struct ColumnLineageDatasetFacet {
128 #[serde(rename = "_producer")]
129 pub producer: String,
130 #[serde(rename = "_schemaURL")]
131 pub schema_url: String,
132 pub fields: BTreeMap<String, ColumnLineageField>,
133}
134
135#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct ColumnLineageField {
138 pub input_fields: Vec<OpenLineageInputField>,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
142#[serde(rename_all = "camelCase")]
143pub struct OpenLineageInputField {
144 pub namespace: String,
145 pub name: String,
146 pub field: String,
147 #[serde(skip_serializing_if = "Vec::is_empty", default)]
148 pub transformations: Vec<OpenLineageTransformation>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
152pub struct OpenLineageTransformation {
153 #[serde(rename = "type")]
154 pub type_: String,
155 pub subtype: String,
156 #[serde(skip_serializing_if = "Option::is_none")]
157 pub description: Option<String>,
158 #[serde(skip_serializing_if = "Option::is_none")]
159 pub masking: Option<bool>,
160}
161
162#[derive(Debug, Clone)]
163struct StatementAnalysis {
164 query: Expression,
165 inputs: Vec<OpenLineageDatasetId>,
166 output: OpenLineageDatasetId,
167 output_column_names: Vec<String>,
168}
169
170#[derive(Debug, Clone)]
171struct OutputField {
172 name: String,
173 lineage_name: String,
174 expression: Option<Expression>,
175 star_source_table: Option<String>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
179struct TerminalField {
180 table: String,
181 field: String,
182}
183
184pub fn openlineage_column_lineage(
186 sql: &str,
187 options: &OpenLineageOptions,
188) -> Result<OpenLineageColumnLineageResult> {
189 validate_common_options(options)?;
190
191 let mut warnings = Vec::new();
192 let schema_mapping = options
193 .schema
194 .as_ref()
195 .map(mapping_schema_from_validation_schema);
196 let dialect = Dialect::get(options.dialect);
197 let mut expressions = dialect.parse(sql)?;
198 if expressions.len() != 1 {
199 return Err(Error::parse(
200 format!(
201 "OpenLineage generation expects exactly one statement, found {}",
202 expressions.len()
203 ),
204 0,
205 0,
206 0,
207 0,
208 ));
209 }
210
211 let expr = expressions.remove(0);
212 let analysis = analyze_statement(&expr, options, &mut warnings)?;
213 let mut output_fields = output_fields_for_query(
214 &analysis.query,
215 schema_mapping.as_ref().map(|s| s as &dyn Schema),
216 options.dialect,
217 &mut warnings,
218 )?;
219 apply_output_column_names(
220 &mut output_fields,
221 &analysis.output_column_names,
222 &mut warnings,
223 );
224
225 let mut fields = BTreeMap::new();
226 for output_field in output_fields {
227 if fields.contains_key(&output_field.name) {
228 warnings.push(OpenLineageWarning::new(
229 "W_DUPLICATE_OUTPUT_FIELD",
230 format!(
231 "Duplicate output field '{}' was merged in the OpenLineage fields map",
232 output_field.name
233 ),
234 ));
235 }
236
237 let input_fields = input_fields_for_output(
238 &analysis.query,
239 &output_field,
240 options,
241 schema_mapping.as_ref().map(|s| s as &dyn Schema),
242 &mut warnings,
243 )?;
244
245 fields.insert(output_field.name, ColumnLineageField { input_fields });
246 }
247
248 let mut outputs = vec![OpenLineageDataset::from(analysis.output.clone())];
249 attach_output_facets(&mut outputs[0], &analysis.output, options, &fields)?;
250
251 Ok(OpenLineageColumnLineageResult {
252 facet: ColumnLineageDatasetFacet {
253 producer: options.producer.clone(),
254 schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
255 fields,
256 },
257 inputs: analysis
258 .inputs
259 .into_iter()
260 .map(OpenLineageDataset::from)
261 .collect(),
262 outputs,
263 warnings,
264 })
265}
266
267pub fn openlineage_job_event(
269 sql: &str,
270 options: &OpenLineageOptions,
271) -> Result<OpenLineageEventResult> {
272 let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
273 let job_name = required_option(&options.job_name, "jobName")?;
274 let event_time = required_option(&options.event_time, "eventTime")?;
275
276 let result = openlineage_column_lineage(sql, options)?;
277 let event = json!({
278 "eventTime": event_time,
279 "producer": options.producer,
280 "schemaURL": OPENLINEAGE_SCHEMA_URL,
281 "job": {
282 "namespace": job_namespace,
283 "name": job_name,
284 "facets": job_facets(sql, options),
285 },
286 "inputs": result.inputs,
287 "outputs": result.outputs,
288 });
289
290 Ok(OpenLineageEventResult {
291 event,
292 warnings: result.warnings,
293 })
294}
295
296pub fn openlineage_run_event(
298 sql: &str,
299 options: &OpenLineageOptions,
300) -> Result<OpenLineageEventResult> {
301 let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
302 let job_name = required_option(&options.job_name, "jobName")?;
303 let event_time = required_option(&options.event_time, "eventTime")?;
304 let run_id = required_option(&options.run_id, "runId")?;
305 let event_type = options
306 .event_type
307 .ok_or_else(|| Error::parse("Missing required option: eventType", 0, 0, 0, 0))?;
308
309 let result = openlineage_column_lineage(sql, options)?;
310 let event = json!({
311 "eventTime": event_time,
312 "eventType": event_type,
313 "producer": options.producer,
314 "schemaURL": OPENLINEAGE_SCHEMA_URL,
315 "run": {
316 "runId": run_id,
317 "facets": {},
318 },
319 "job": {
320 "namespace": job_namespace,
321 "name": job_name,
322 "facets": job_facets(sql, options),
323 },
324 "inputs": result.inputs,
325 "outputs": result.outputs,
326 });
327
328 Ok(OpenLineageEventResult {
329 event,
330 warnings: result.warnings,
331 })
332}
333
334fn validate_common_options(options: &OpenLineageOptions) -> Result<()> {
335 if options.producer.trim().is_empty() {
336 return Err(Error::parse(
337 "Missing required option: producer",
338 0,
339 0,
340 0,
341 0,
342 ));
343 }
344 Ok(())
345}
346
347fn required_option(value: &Option<String>, name: &str) -> Result<String> {
348 match value.as_ref().filter(|v| !v.trim().is_empty()) {
349 Some(value) => Ok(value.clone()),
350 None => Err(Error::parse(
351 format!("Missing required option: {name}"),
352 0,
353 0,
354 0,
355 0,
356 )),
357 }
358}
359
360fn analyze_statement(
361 expr: &Expression,
362 options: &OpenLineageOptions,
363 warnings: &mut Vec<OpenLineageWarning>,
364) -> Result<StatementAnalysis> {
365 match expr {
366 Expression::Prepare(prepare) => analyze_statement(&prepare.statement, options, warnings),
367 Expression::Select(select) => {
368 let output = if let Some(into) = &select.into {
369 dataset_from_expression(&into.this, options)?
370 } else {
371 options.output_dataset.clone().ok_or_else(|| {
372 Error::parse(
373 "OpenLineage outputDataset is required for SELECT statements without SELECT INTO",
374 0,
375 0,
376 0,
377 0,
378 )
379 })?
380 };
381 Ok(StatementAnalysis {
382 query: expr.clone(),
383 inputs: collect_input_datasets(expr, options, Some(&output), warnings)?,
384 output,
385 output_column_names: Vec::new(),
386 })
387 }
388 Expression::Insert(insert) => {
389 let output = dataset_from_table_ref(&insert.table, options)?;
390 let query = insert.query.clone().ok_or_else(|| {
391 Error::unsupported(
392 "OpenLineage column lineage for INSERT without query",
393 options.dialect.to_string(),
394 )
395 })?;
396 Ok(StatementAnalysis {
397 inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
398 query,
399 output,
400 output_column_names: insert.columns.iter().map(|col| col.name.clone()).collect(),
401 })
402 }
403 Expression::CreateTable(create) => {
404 let output = dataset_from_table_ref(&create.name, options)?;
405 let query = create.as_select.clone().ok_or_else(|| {
406 Error::unsupported(
407 "OpenLineage column lineage for CREATE TABLE without AS SELECT",
408 options.dialect.to_string(),
409 )
410 })?;
411 Ok(StatementAnalysis {
412 inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
413 query,
414 output,
415 output_column_names: create
416 .columns
417 .iter()
418 .map(|col| col.name.name.clone())
419 .collect(),
420 })
421 }
422 _ => Err(Error::unsupported(
423 format!("OpenLineage generation for {}", expr.variant_name()),
424 options.dialect.to_string(),
425 )),
426 }
427}
428
429fn output_fields_for_query(
430 query: &Expression,
431 schema: Option<&dyn Schema>,
432 dialect: DialectType,
433 warnings: &mut Vec<OpenLineageWarning>,
434) -> Result<Vec<OutputField>> {
435 let select = leftmost_select(query).ok_or_else(|| {
436 Error::unsupported(
437 "OpenLineage output field extraction for non-SELECT query",
438 dialect.to_string(),
439 )
440 })?;
441
442 let mut fields = Vec::new();
443 for (idx, expr) in select.expressions.iter().enumerate() {
444 if is_star_expr(expr) {
445 expand_star_output_fields(select, expr, schema, warnings, &mut fields);
446 continue;
447 }
448
449 let name = output_name(expr).unwrap_or_else(|| format!("_{idx}"));
450 fields.push(OutputField {
451 lineage_name: name.clone(),
452 name,
453 expression: Some(expr.clone()),
454 star_source_table: None,
455 });
456 }
457 Ok(fields)
458}
459
460fn apply_output_column_names(
461 fields: &mut [OutputField],
462 output_column_names: &[String],
463 warnings: &mut Vec<OpenLineageWarning>,
464) {
465 if output_column_names.is_empty() {
466 return;
467 }
468 if output_column_names.len() != fields.len() {
469 warnings.push(OpenLineageWarning::new(
470 "W_OUTPUT_COLUMN_COUNT_MISMATCH",
471 format!(
472 "Target column count ({}) does not match projected column count ({})",
473 output_column_names.len(),
474 fields.len()
475 ),
476 ));
477 return;
478 }
479 for (field, output_name) in fields.iter_mut().zip(output_column_names) {
480 field.name = output_name.clone();
481 }
482}
483
484fn input_fields_for_output(
485 query: &Expression,
486 output_field: &OutputField,
487 options: &OpenLineageOptions,
488 schema: Option<&dyn Schema>,
489 warnings: &mut Vec<OpenLineageWarning>,
490) -> Result<Vec<OpenLineageInputField>> {
491 if let Some(table) = &output_field.star_source_table {
492 return terminal_fields_to_openlineage(
493 vec![TerminalField {
494 table: table.clone(),
495 field: output_field.lineage_name.clone(),
496 }],
497 "IDENTITY",
498 Some(format!("SELECT {}", output_field.lineage_name)),
499 options,
500 warnings,
501 );
502 }
503
504 let lineage_result = if let Some(schema) = schema {
505 lineage::lineage_with_schema(
506 &output_field.lineage_name,
507 query,
508 Some(schema),
509 Some(options.dialect),
510 false,
511 )
512 } else {
513 lineage::lineage(
514 &output_field.lineage_name,
515 query,
516 Some(options.dialect),
517 false,
518 )
519 };
520
521 let node = match lineage_result {
522 Ok(node) => node,
523 Err(err) => {
524 warnings.push(OpenLineageWarning::new(
525 "W_UNRESOLVED_OUTPUT_FIELD",
526 format!(
527 "Could not resolve lineage for output field '{}': {}",
528 output_field.name, err
529 ),
530 ));
531 return Ok(Vec::new());
532 }
533 };
534
535 let mut terminals = BTreeSet::new();
536 collect_terminal_fields(&node, &mut terminals);
537 let terminals: Vec<TerminalField> = terminals.into_iter().collect();
538
539 if terminals.is_empty() {
540 warnings.push(OpenLineageWarning::new(
541 "W_EMPTY_FIELD_LINEAGE",
542 format!(
543 "No input fields were found for output field '{}'",
544 output_field.name
545 ),
546 ));
547 return Ok(Vec::new());
548 }
549
550 let subtype = transformation_subtype(output_field.expression.as_ref(), &terminals);
551 let description = output_field
552 .expression
553 .as_ref()
554 .and_then(|expr| transformation_description(expr, options.dialect));
555
556 terminal_fields_to_openlineage(terminals, subtype, description, options, warnings)
557}
558
559fn transformation_description(expr: &Expression, dialect: DialectType) -> Option<String> {
560 #[cfg(feature = "generate")]
561 {
562 Some(expr.sql_for(dialect))
563 }
564
565 #[cfg(not(feature = "generate"))]
566 {
567 let _ = (expr, dialect);
568 None
569 }
570}
571
572fn terminal_fields_to_openlineage(
573 terminals: Vec<TerminalField>,
574 subtype: &str,
575 description: Option<String>,
576 options: &OpenLineageOptions,
577 warnings: &mut Vec<OpenLineageWarning>,
578) -> Result<Vec<OpenLineageInputField>> {
579 let mut result = Vec::new();
580 for terminal in terminals {
581 let dataset = dataset_from_table_name(&terminal.table, options).map_err(|err| {
582 warnings.push(OpenLineageWarning::new(
583 "W_UNRESOLVED_DATASET",
584 format!(
585 "Could not map table '{}' to an OpenLineage dataset: {}",
586 terminal.table, err
587 ),
588 ));
589 err
590 })?;
591 result.push(OpenLineageInputField {
592 namespace: dataset.namespace,
593 name: dataset.name,
594 field: terminal.field,
595 transformations: vec![OpenLineageTransformation {
596 type_: "DIRECT".to_string(),
597 subtype: subtype.to_string(),
598 description: description.clone(),
599 masking: Some(false),
600 }],
601 });
602 }
603 Ok(result)
604}
605
606fn transformation_subtype(expr: Option<&Expression>, terminals: &[TerminalField]) -> &'static str {
607 let Some(expr) = expr else {
608 return "TRANSFORMATION";
609 };
610 let unaliased = unalias(expr);
611 if expression_contains_aggregate(unaliased) {
612 return "AGGREGATION";
613 }
614 if terminals.len() == 1 {
615 if let Expression::Column(col) = unaliased {
616 if col.name.name == terminals[0].field {
617 return "IDENTITY";
618 }
619 }
620 }
621 "TRANSFORMATION"
622}
623
624fn collect_terminal_fields(node: &LineageNode, terminals: &mut BTreeSet<TerminalField>) {
625 if node.downstream.is_empty() {
626 if let Expression::Column(column) = &node.expression {
627 let table = if !node.source_name.is_empty() {
628 Some(node.source_name.clone())
629 } else if let Expression::Table(table) = &node.source {
630 Some(table_ref_qualified_name(table))
631 } else {
632 column.table.as_ref().map(|t| t.name.clone())
633 };
634 if let Some(table) = table.filter(|t| !t.is_empty()) {
635 terminals.insert(TerminalField {
636 table,
637 field: column.name.name.clone(),
638 });
639 }
640 }
641 return;
642 }
643
644 for child in &node.downstream {
645 collect_terminal_fields(child, terminals);
646 }
647}
648
649fn expression_contains_aggregate(expr: &Expression) -> bool {
650 expr.contains(|node| {
651 matches!(
652 node,
653 Expression::AggregateFunction(_)
654 | Expression::Sum(_)
655 | Expression::Count(_)
656 | Expression::Avg(_)
657 | Expression::Min(_)
658 | Expression::Max(_)
659 | Expression::GroupConcat(_)
660 | Expression::StringAgg(_)
661 | Expression::ListAgg(_)
662 | Expression::ArrayAgg(_)
663 | Expression::CountIf(_)
664 | Expression::SumIf(_)
665 | Expression::Stddev(_)
666 | Expression::StddevPop(_)
667 | Expression::StddevSamp(_)
668 | Expression::Variance(_)
669 | Expression::VarPop(_)
670 | Expression::VarSamp(_)
671 | Expression::Median(_)
672 | Expression::Mode(_)
673 | Expression::First(_)
674 | Expression::Last(_)
675 | Expression::AnyValue(_)
676 | Expression::ApproxDistinct(_)
677 | Expression::ApproxCountDistinct(_)
678 | Expression::ApproxPercentile(_)
679 | Expression::Percentile(_)
680 | Expression::LogicalAnd(_)
681 | Expression::LogicalOr(_)
682 | Expression::Skewness(_)
683 | Expression::BitwiseCount(_)
684 | Expression::ArrayConcatAgg(_)
685 | Expression::ArrayUniqueAgg(_)
686 | Expression::BoolXorAgg(_)
687 | Expression::ParameterizedAgg(_)
688 | Expression::ArgMax(_)
689 | Expression::ArgMin(_)
690 | Expression::ApproxTopK(_)
691 | Expression::ApproxTopKAccumulate(_)
692 | Expression::ApproxTopKCombine(_)
693 | Expression::ApproxTopKEstimate(_)
694 | Expression::ApproxTopSum(_)
695 | Expression::ApproxQuantiles(_)
696 | Expression::Grouping(_)
697 | Expression::GroupingId(_)
698 | Expression::AnonymousAggFunc(_)
699 | Expression::CombinedAggFunc(_)
700 | Expression::CombinedParameterizedAgg(_)
701 | Expression::HashAgg(_)
702 | Expression::ObjectAgg(_)
703 | Expression::AIAgg(_)
704 )
705 })
706}
707
708fn collect_input_datasets(
709 expr: &Expression,
710 options: &OpenLineageOptions,
711 output: Option<&OpenLineageDatasetId>,
712 warnings: &mut Vec<OpenLineageWarning>,
713) -> Result<Vec<OpenLineageDatasetId>> {
714 let cte_aliases = collect_cte_aliases(expr, options.dialect);
715 let mut seen = BTreeSet::new();
716 let mut result = Vec::new();
717
718 for table in expr.dfs().filter_map(|node| match node {
719 Expression::Table(table) => Some(table),
720 _ => None,
721 }) {
722 let qname = table_ref_qualified_name(table);
723 let normalized = normalize_identifier(&table.name.name, options.dialect, true);
724 if cte_aliases.contains(&normalized) {
725 continue;
726 }
727 if output
728 .map(|out| out.name == qname || out.name == table.name.name)
729 .unwrap_or(false)
730 {
731 continue;
732 }
733 match dataset_from_table_name(&qname, options) {
734 Ok(dataset) => {
735 if seen.insert((dataset.namespace.clone(), dataset.name.clone())) {
736 result.push(dataset);
737 }
738 }
739 Err(err) => warnings.push(OpenLineageWarning::new(
740 "W_UNRESOLVED_DATASET",
741 format!("Could not map input table '{qname}': {err}"),
742 )),
743 }
744 }
745
746 Ok(result)
747}
748
749fn attach_output_facets(
750 output: &mut OpenLineageDataset,
751 output_id: &OpenLineageDatasetId,
752 options: &OpenLineageOptions,
753 fields: &BTreeMap<String, ColumnLineageField>,
754) -> Result<()> {
755 let column_lineage = ColumnLineageDatasetFacet {
756 producer: options.producer.clone(),
757 schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
758 fields: fields.clone(),
759 };
760 output.facets.insert(
761 "columnLineage".to_string(),
762 serde_json::to_value(column_lineage).map_err(openlineage_serialization_error)?,
763 );
764
765 if let Some(schema_facet) = schema_facet_for_dataset(output_id, options) {
766 output.facets.insert(
767 "schema".to_string(),
768 serde_json::to_value(schema_facet).map_err(openlineage_serialization_error)?,
769 );
770 }
771
772 Ok(())
773}
774
775fn job_facets(sql: &str, options: &OpenLineageOptions) -> Value {
776 json!({
777 "sql": {
778 "_producer": options.producer,
779 "_schemaURL": SQL_JOB_FACET_SCHEMA_URL,
780 "query": sql,
781 "dialect": options.dialect.to_string(),
782 },
783 "jobType": {
784 "_producer": options.producer,
785 "_schemaURL": JOB_TYPE_JOB_FACET_SCHEMA_URL,
786 "processingType": "BATCH",
787 "integration": "POLYGLOT_SQL",
788 "jobType": "QUERY",
789 }
790 })
791}
792
793#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
794struct SchemaDatasetFacet {
795 #[serde(rename = "_producer")]
796 producer: String,
797 #[serde(rename = "_schemaURL")]
798 schema_url: String,
799 fields: Vec<SchemaDatasetFacetField>,
800}
801
802#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
803struct SchemaDatasetFacetField {
804 name: String,
805 #[serde(skip_serializing_if = "String::is_empty", default)]
806 #[serde(rename = "type")]
807 data_type: String,
808 #[serde(skip_serializing_if = "Option::is_none")]
809 ordinal_position: Option<usize>,
810}
811
812fn schema_facet_for_dataset(
813 output: &OpenLineageDatasetId,
814 options: &OpenLineageOptions,
815) -> Option<SchemaDatasetFacet> {
816 let schema = options.schema.as_ref()?;
817 let table = schema.tables.iter().find(|table| {
818 let qname = if let Some(schema_name) = &table.schema {
819 format!("{}.{}", schema_name, table.name)
820 } else {
821 table.name.clone()
822 };
823 output.name == table.name || output.name == qname
824 })?;
825
826 Some(SchemaDatasetFacet {
827 producer: options.producer.clone(),
828 schema_url: SCHEMA_DATASET_FACET_SCHEMA_URL.to_string(),
829 fields: table
830 .columns
831 .iter()
832 .enumerate()
833 .map(|(idx, col)| SchemaDatasetFacetField {
834 name: col.name.clone(),
835 data_type: col.data_type.clone(),
836 ordinal_position: Some(idx + 1),
837 })
838 .collect(),
839 })
840}
841
842fn expand_star_output_fields(
843 select: &Select,
844 star_expr: &Expression,
845 schema: Option<&dyn Schema>,
846 warnings: &mut Vec<OpenLineageWarning>,
847 fields: &mut Vec<OutputField>,
848) {
849 let Some(schema) = schema else {
850 warnings.push(OpenLineageWarning::new(
851 "W_STAR_WITHOUT_SCHEMA",
852 "SELECT * cannot be expanded into OpenLineage column lineage without schema metadata",
853 ));
854 return;
855 };
856
857 let qualifier = star_qualifier(star_expr);
858 let sources = select_source_tables(select);
859 for (alias, qname) in sources {
860 if qualifier
861 .as_ref()
862 .map(|q| q != &alias && q != &qname)
863 .unwrap_or(false)
864 {
865 continue;
866 }
867 match schema.column_names(&qname) {
868 Ok(columns) => {
869 for name in columns {
870 fields.push(OutputField {
871 lineage_name: name.clone(),
872 name,
873 expression: None,
874 star_source_table: Some(qname.clone()),
875 });
876 }
877 }
878 Err(err) => warnings.push(OpenLineageWarning::new(
879 "W_STAR_SCHEMA_LOOKUP_FAILED",
880 format!("Could not expand SELECT * for table '{}': {}", qname, err),
881 )),
882 }
883 }
884}
885
886fn select_source_tables(select: &Select) -> Vec<(String, String)> {
887 let mut result = Vec::new();
888 if let Some(from) = &select.from {
889 for expr in &from.expressions {
890 collect_source_table(expr, &mut result);
891 }
892 }
893 for join in &select.joins {
894 collect_source_table(&join.this, &mut result);
895 }
896 result
897}
898
899fn collect_source_table(expr: &Expression, result: &mut Vec<(String, String)>) {
900 match expr {
901 Expression::Table(table) => {
902 let qname = table_ref_qualified_name(table);
903 let alias = table
904 .alias
905 .as_ref()
906 .map(|a| a.name.clone())
907 .unwrap_or_else(|| table.name.name.clone());
908 result.push((alias, qname));
909 }
910 Expression::Alias(alias) => collect_source_table(&alias.this, result),
911 Expression::Paren(paren) => collect_source_table(&paren.this, result),
912 _ => {}
913 }
914}
915
916fn leftmost_select(expr: &Expression) -> Option<&Select> {
917 match expr {
918 Expression::Prepare(prepare) => leftmost_select(&prepare.statement),
919 Expression::Select(select) => Some(select),
920 Expression::Union(union) => leftmost_select(&union.left),
921 Expression::Intersect(intersect) => leftmost_select(&intersect.left),
922 Expression::Except(except) => leftmost_select(&except.left),
923 Expression::Subquery(subquery) => leftmost_select(&subquery.this),
924 _ => None,
925 }
926}
927
928fn output_name(expr: &Expression) -> Option<String> {
929 match expr {
930 Expression::Alias(alias) => Some(alias.alias.name.clone()),
931 Expression::Column(col) => Some(col.name.name.clone()),
932 Expression::Identifier(id) => Some(id.name.clone()),
933 Expression::Annotated(a) => output_name(&a.this),
934 _ => None,
935 }
936}
937
938fn unalias(expr: &Expression) -> &Expression {
939 match expr {
940 Expression::Alias(alias) => &alias.this,
941 Expression::Annotated(a) => unalias(&a.this),
942 _ => expr,
943 }
944}
945
946fn is_star_expr(expr: &Expression) -> bool {
947 matches!(expr, Expression::Star(_))
948 || matches!(expr, Expression::Column(col) if col.name.name == "*")
949}
950
951fn star_qualifier(expr: &Expression) -> Option<String> {
952 match expr {
953 Expression::Star(star) => star.table.as_ref().map(|t| t.name.clone()),
954 Expression::Column(col) if col.name.name == "*" => {
955 col.table.as_ref().map(|t| t.name.clone())
956 }
957 _ => None,
958 }
959}
960
961fn dataset_from_expression(
962 expr: &Expression,
963 options: &OpenLineageOptions,
964) -> Result<OpenLineageDatasetId> {
965 match expr {
966 Expression::Table(table) => dataset_from_table_ref(table, options),
967 Expression::Identifier(id) => dataset_from_table_name(&id.name, options),
968 _ => Err(Error::unsupported(
969 "OpenLineage dataset extraction from non-table expression",
970 options.dialect.to_string(),
971 )),
972 }
973}
974
975fn dataset_from_table_ref(
976 table: &TableRef,
977 options: &OpenLineageOptions,
978) -> Result<OpenLineageDatasetId> {
979 dataset_from_table_name(&table_ref_qualified_name(table), options)
980}
981
982fn dataset_from_table_name(
983 table_name: &str,
984 options: &OpenLineageOptions,
985) -> Result<OpenLineageDatasetId> {
986 if let Some(mapped) = options.dataset_mappings.get(table_name) {
987 return Ok(mapped.clone());
988 }
989 let namespace = options.dataset_namespace.as_ref().ok_or_else(|| {
990 Error::parse(
991 format!(
992 "Missing datasetNamespace or explicit dataset mapping for table '{}'",
993 table_name
994 ),
995 0,
996 0,
997 0,
998 0,
999 )
1000 })?;
1001 Ok(OpenLineageDatasetId::new(namespace, table_name))
1002}
1003
1004fn table_ref_qualified_name(table: &TableRef) -> String {
1005 let mut parts = Vec::new();
1006 if let Some(catalog) = &table.catalog {
1007 parts.push(catalog.name.clone());
1008 }
1009 if let Some(schema) = &table.schema {
1010 parts.push(schema.name.clone());
1011 }
1012 parts.push(table.name.name.clone());
1013 parts.join(".")
1014}
1015
1016fn collect_cte_aliases(expr: &Expression, dialect: DialectType) -> HashSet<String> {
1017 let mut aliases = HashSet::new();
1018 for node in expr.dfs() {
1019 match node {
1020 Expression::Select(select) => {
1021 if let Some(with) = &select.with {
1022 collect_with_aliases(with, dialect, &mut aliases);
1023 }
1024 }
1025 Expression::Union(union) => {
1026 if let Some(with) = &union.with {
1027 collect_with_aliases(with, dialect, &mut aliases);
1028 }
1029 }
1030 Expression::Intersect(intersect) => {
1031 if let Some(with) = &intersect.with {
1032 collect_with_aliases(with, dialect, &mut aliases);
1033 }
1034 }
1035 Expression::Except(except) => {
1036 if let Some(with) = &except.with {
1037 collect_with_aliases(with, dialect, &mut aliases);
1038 }
1039 }
1040 _ => {}
1041 }
1042 }
1043 aliases
1044}
1045
1046fn collect_with_aliases(with: &With, dialect: DialectType, aliases: &mut HashSet<String>) {
1047 for cte in &with.ctes {
1048 aliases.insert(normalize_identifier(&cte.alias.name, dialect, true));
1049 }
1050}
1051
1052fn normalize_identifier(name: &str, dialect: DialectType, is_table: bool) -> String {
1053 crate::schema::normalize_name(name, Some(dialect), is_table, true)
1054}
1055
1056fn openlineage_serialization_error(err: serde_json::Error) -> Error {
1057 Error::internal(format!("OpenLineage serialization failed: {err}"))
1058}
1059
1060fn deserialize_dialect_type<'de, D>(deserializer: D) -> std::result::Result<DialectType, D::Error>
1061where
1062 D: Deserializer<'de>,
1063{
1064 let value = String::deserialize(deserializer)?;
1065 value.parse::<DialectType>().map_err(de::Error::custom)
1066}
1067
1068#[cfg(test)]
1069mod tests {
1070 use super::*;
1071
1072 fn options() -> OpenLineageOptions {
1073 OpenLineageOptions {
1074 dialect: DialectType::PostgreSQL,
1075 producer: "https://github.com/tobilg/polyglot".to_string(),
1076 dataset_namespace: Some("postgres://warehouse".to_string()),
1077 output_dataset: Some(OpenLineageDatasetId::new(
1078 "postgres://warehouse",
1079 "analytics.out",
1080 )),
1081 job_namespace: Some("polyglot-tests".to_string()),
1082 job_name: Some("lineage-test".to_string()),
1083 event_time: Some("2026-05-18T00:00:00Z".to_string()),
1084 run_id: Some("3b452093-782c-4ef2-9c0c-aafe2aa6f34d".to_string()),
1085 event_type: Some(OpenLineageRunEventType::Complete),
1086 ..Default::default()
1087 }
1088 }
1089
1090 #[test]
1091 fn deserializes_dialect_aliases_in_options() {
1092 let options: OpenLineageOptions =
1093 serde_json::from_str(r#"{"producer":"polyglot","dialect":"postgres"}"#)
1094 .expect("options");
1095 assert_eq!(options.dialect, DialectType::PostgreSQL);
1096 }
1097
1098 #[test]
1099 fn emits_identity_column_lineage_for_select() {
1100 let result = openlineage_column_lineage("SELECT a FROM t", &options()).expect("lineage");
1101 let field = result.facet.fields.get("a").expect("field a");
1102 assert_eq!(field.input_fields.len(), 1);
1103 assert_eq!(field.input_fields[0].name, "t");
1104 assert_eq!(field.input_fields[0].field, "a");
1105 assert_eq!(field.input_fields[0].transformations[0].subtype, "IDENTITY");
1106 }
1107
1108 #[test]
1109 fn emits_column_lineage_for_prepared_statement_body() {
1110 let result = openlineage_column_lineage(
1111 "PREPARE leak AS SELECT id FROM sensitive_table WHERE id = $1",
1112 &options(),
1113 )
1114 .expect("lineage");
1115 let field = result.facet.fields.get("id").expect("field id");
1116 assert_eq!(field.input_fields.len(), 1);
1117 assert_eq!(field.input_fields[0].name, "sensitive_table");
1118 assert_eq!(field.input_fields[0].field, "id");
1119 }
1120
1121 #[test]
1122 fn resolves_input_dataset_behind_table_alias() {
1123 let result = openlineage_column_lineage("SELECT o.total FROM orders o", &options())
1124 .expect("lineage");
1125 let field = result.facet.fields.get("total").expect("field total");
1126 assert_eq!(field.input_fields[0].name, "orders");
1127 assert_eq!(field.input_fields[0].field, "total");
1128 }
1129
1130 #[test]
1131 fn emits_transformation_column_lineage_for_expression() {
1132 let result =
1133 openlineage_column_lineage("SELECT a + b AS c FROM t", &options()).expect("lineage");
1134 let field = result.facet.fields.get("c").expect("field c");
1135 assert_eq!(field.input_fields.len(), 2);
1136 assert!(field.input_fields.iter().any(|f| f.field == "a"));
1137 assert!(field.input_fields.iter().any(|f| f.field == "b"));
1138 assert!(field
1139 .input_fields
1140 .iter()
1141 .all(|f| f.transformations[0].subtype == "TRANSFORMATION"));
1142 }
1143
1144 #[test]
1145 fn omits_bigquery_safe_namespace_from_column_lineage_issue207() {
1146 let mut opts = options();
1147 opts.dialect = DialectType::BigQuery;
1148
1149 let result = openlineage_column_lineage(
1150 r#"
1151WITH import_cte AS (
1152 SELECT timestamp, data, operation
1153 FROM `project`.`dataset`.`source_table`
1154),
1155transform_cte AS (
1156 SELECT
1157 timestamp,
1158 SAFE.PARSE_JSON(data) AS json_data
1159 FROM import_cte
1160)
1161SELECT json_data FROM transform_cte
1162"#,
1163 &opts,
1164 )
1165 .expect("lineage");
1166 let field = result.facet.fields.get("json_data").expect("json_data");
1167
1168 assert!(
1169 field.input_fields.iter().any(|input| input.field == "data"),
1170 "expected data input field, got {:?}",
1171 field.input_fields
1172 );
1173 assert!(
1174 !field
1175 .input_fields
1176 .iter()
1177 .any(|input| input.field.eq_ignore_ascii_case("safe")),
1178 "did not expect SAFE namespace as input field, got {:?}",
1179 field.input_fields
1180 );
1181 }
1182
1183 #[test]
1184 fn emits_bigquery_unnest_alias_column_lineage_issue209() {
1185 let mut opts = options();
1186 opts.dialect = DialectType::BigQuery;
1187 opts.dataset_namespace = Some("bigquery://warehouse".to_string());
1188 opts.output_dataset = Some(OpenLineageDatasetId::new(
1189 "bigquery://warehouse",
1190 "calendar",
1191 ));
1192
1193 let result = openlineage_column_lineage(
1194 r#"
1195SELECT date_val AS week_start
1196FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
1197"#,
1198 &opts,
1199 )
1200 .expect("lineage");
1201 let field = result.facet.fields.get("week_start").expect("week_start");
1202
1203 assert_eq!(field.input_fields.len(), 1);
1204 assert_eq!(field.input_fields[0].name, "date_val");
1205 assert_eq!(field.input_fields[0].field, "date_val");
1206 assert!(
1207 result
1208 .warnings
1209 .iter()
1210 .all(|warning| warning.code != "W_EMPTY_FIELD_LINEAGE"),
1211 "did not expect empty-lineage warning, got {:?}",
1212 result.warnings
1213 );
1214 }
1215
1216 #[test]
1217 fn emits_aggregation_column_lineage() {
1218 let result =
1219 openlineage_column_lineage("SELECT SUM(amount) AS total FROM orders", &options())
1220 .expect("lineage");
1221 let field = result.facet.fields.get("total").expect("field total");
1222 assert_eq!(field.input_fields[0].field, "amount");
1223 assert_eq!(
1224 field.input_fields[0].transformations[0].subtype,
1225 "AGGREGATION"
1226 );
1227 }
1228
1229 #[test]
1230 fn infers_insert_output_dataset() {
1231 let mut opts = options();
1232 opts.output_dataset = None;
1233 let result =
1234 openlineage_column_lineage("INSERT INTO analytics.out SELECT a FROM raw.input", &opts)
1235 .expect("lineage");
1236 assert_eq!(result.outputs[0].name, "analytics.out");
1237 assert_eq!(result.inputs[0].name, "raw.input");
1238 }
1239
1240 #[test]
1241 fn maps_insert_target_columns_to_output_fields() {
1242 let mut opts = options();
1243 opts.output_dataset = None;
1244 let result = openlineage_column_lineage(
1245 "INSERT INTO analytics.out (target_a) SELECT source_a FROM raw.input",
1246 &opts,
1247 )
1248 .expect("lineage");
1249 let field = result.facet.fields.get("target_a").expect("target field");
1250 assert_eq!(field.input_fields[0].field, "source_a");
1251 assert!(!result.facet.fields.contains_key("source_a"));
1252 }
1253
1254 #[test]
1255 fn pure_select_requires_output_dataset() {
1256 let mut opts = options();
1257 opts.output_dataset = None;
1258 let err = openlineage_column_lineage("SELECT a FROM t", &opts).unwrap_err();
1259 assert!(err.to_string().contains("outputDataset is required"));
1260 }
1261
1262 #[test]
1263 fn emits_job_event_payload() {
1264 let result = openlineage_job_event("SELECT a FROM t", &options()).expect("event");
1265 assert_eq!(result.event["job"]["namespace"], "polyglot-tests");
1266 assert_eq!(
1267 result.event["job"]["facets"]["sql"]["_schemaURL"],
1268 SQL_JOB_FACET_SCHEMA_URL
1269 );
1270 assert_eq!(
1271 result.event["outputs"][0]["facets"]["columnLineage"]["fields"]["a"]["inputFields"][0]
1272 ["field"],
1273 "a"
1274 );
1275 }
1276
1277 #[test]
1278 fn emits_run_event_payload() {
1279 let result = openlineage_run_event("SELECT a FROM t", &options()).expect("event");
1280 assert_eq!(result.event["eventType"], "COMPLETE");
1281 assert_eq!(
1282 result.event["run"]["runId"],
1283 "3b452093-782c-4ef2-9c0c-aafe2aa6f34d"
1284 );
1285 }
1286
1287 #[test]
1288 fn select_star_without_schema_warns() {
1289 let result = openlineage_column_lineage("SELECT * FROM t", &options()).expect("lineage");
1290 assert!(result.facet.fields.is_empty());
1291 assert!(result
1292 .warnings
1293 .iter()
1294 .any(|w| w.code == "W_STAR_WITHOUT_SCHEMA"));
1295 }
1296
1297 #[test]
1298 fn select_star_with_schema_expands_fields() {
1299 let mut opts = options();
1300 opts.schema = Some(ValidationSchema {
1301 strict: None,
1302 tables: vec![crate::validation::SchemaTable {
1303 name: "t".to_string(),
1304 schema: None,
1305 columns: vec![
1306 crate::validation::SchemaColumn {
1307 name: "a".to_string(),
1308 data_type: "INT".to_string(),
1309 nullable: None,
1310 primary_key: false,
1311 unique: false,
1312 references: None,
1313 },
1314 crate::validation::SchemaColumn {
1315 name: "b".to_string(),
1316 data_type: "TEXT".to_string(),
1317 nullable: None,
1318 primary_key: false,
1319 unique: false,
1320 references: None,
1321 },
1322 ],
1323 aliases: vec![],
1324 primary_key: vec![],
1325 unique_keys: vec![],
1326 foreign_keys: vec![],
1327 }],
1328 });
1329
1330 let result = openlineage_column_lineage("SELECT * FROM t", &opts).expect("lineage");
1331 assert!(result.facet.fields.contains_key("a"));
1332 assert!(result.facet.fields.contains_key("b"));
1333 }
1334}