1use crate::dialects::{Dialect, DialectType};
8use crate::expressions::*;
9use crate::lineage::{self, LineageNode};
10use crate::schema::Schema;
11use crate::scope::SourceKind;
12use crate::traversal::ExpressionWalk;
13use crate::{mapping_schema_from_validation_schema, Error, Result, ValidationSchema};
14use serde::de::{self, Deserializer};
15use serde::{Deserialize, Serialize};
16use serde_json::{json, Value};
17use std::collections::{BTreeMap, BTreeSet, HashSet};
18
19pub const OPENLINEAGE_SCHEMA_URL: &str = "https://openlineage.io/spec/2-0-2/OpenLineage.json";
20pub const COLUMN_LINEAGE_FACET_SCHEMA_URL: &str =
21 "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json";
22pub const SQL_JOB_FACET_SCHEMA_URL: &str =
23 "https://openlineage.io/spec/facets/1-1-0/SQLJobFacet.json";
24pub const JOB_TYPE_JOB_FACET_SCHEMA_URL: &str =
25 "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json";
26pub const SCHEMA_DATASET_FACET_SCHEMA_URL: &str =
27 "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json";
28
29#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
31#[serde(rename_all = "camelCase")]
32pub struct OpenLineageDatasetId {
33 pub namespace: String,
34 pub name: String,
35}
36
37impl OpenLineageDatasetId {
38 pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
39 Self {
40 namespace: namespace.into(),
41 name: name.into(),
42 }
43 }
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize, Default)]
48#[serde(rename_all = "camelCase", default)]
49pub struct OpenLineageOptions {
50 #[serde(deserialize_with = "deserialize_dialect_type")]
51 pub dialect: DialectType,
52 pub producer: String,
53 pub dataset_namespace: Option<String>,
54 pub dataset_mappings: BTreeMap<String, OpenLineageDatasetId>,
55 pub output_dataset: Option<OpenLineageDatasetId>,
56 pub schema: Option<ValidationSchema>,
57 pub job_namespace: Option<String>,
58 pub job_name: Option<String>,
59 pub event_time: Option<String>,
60 pub run_id: Option<String>,
61 pub event_type: Option<OpenLineageRunEventType>,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
66#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
67pub enum OpenLineageRunEventType {
68 Start,
69 Running,
70 Complete,
71 Abort,
72 Fail,
73 Other,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
78#[serde(rename_all = "camelCase")]
79pub struct OpenLineageWarning {
80 pub code: String,
81 pub message: String,
82}
83
84impl OpenLineageWarning {
85 fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
86 Self {
87 code: code.into(),
88 message: message.into(),
89 }
90 }
91}
92
93#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
94#[serde(rename_all = "camelCase")]
95pub struct OpenLineageColumnLineageResult {
96 pub facet: ColumnLineageDatasetFacet,
97 pub inputs: Vec<OpenLineageDataset>,
98 pub outputs: Vec<OpenLineageDataset>,
99 pub warnings: Vec<OpenLineageWarning>,
100}
101
102#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104pub struct OpenLineageEventResult {
105 pub event: Value,
106 pub warnings: Vec<OpenLineageWarning>,
107}
108
109#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
110pub struct OpenLineageDataset {
111 pub namespace: String,
112 pub name: String,
113 #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
114 pub facets: BTreeMap<String, Value>,
115}
116
117impl std::convert::From<OpenLineageDatasetId> for OpenLineageDataset {
118 fn from(id: OpenLineageDatasetId) -> Self {
119 Self {
120 namespace: id.namespace,
121 name: id.name,
122 facets: BTreeMap::new(),
123 }
124 }
125}
126
127#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
128pub struct ColumnLineageDatasetFacet {
129 #[serde(rename = "_producer")]
130 pub producer: String,
131 #[serde(rename = "_schemaURL")]
132 pub schema_url: String,
133 pub fields: BTreeMap<String, ColumnLineageField>,
134}
135
136#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
137#[serde(rename_all = "camelCase")]
138pub struct ColumnLineageField {
139 pub input_fields: Vec<OpenLineageInputField>,
140}
141
142#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
143#[serde(rename_all = "camelCase")]
144pub struct OpenLineageInputField {
145 pub namespace: String,
146 pub name: String,
147 pub field: String,
148 #[serde(skip_serializing_if = "Vec::is_empty", default)]
149 pub transformations: Vec<OpenLineageTransformation>,
150}
151
152#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
153pub struct OpenLineageTransformation {
154 #[serde(rename = "type")]
155 pub type_: String,
156 pub subtype: String,
157 #[serde(skip_serializing_if = "Option::is_none")]
158 pub description: Option<String>,
159 #[serde(skip_serializing_if = "Option::is_none")]
160 pub masking: Option<bool>,
161}
162
163#[derive(Debug, Clone)]
164struct StatementAnalysis {
165 query: Expression,
166 inputs: Vec<OpenLineageDatasetId>,
167 output: OpenLineageDatasetId,
168 output_column_names: Vec<String>,
169}
170
171#[derive(Debug, Clone)]
172struct OutputField {
173 name: String,
174 lineage_name: String,
175 expression: Option<Expression>,
176 star_source_table: Option<String>,
177}
178
179#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
180struct TerminalField {
181 table: String,
182 field: String,
183}
184
185pub fn openlineage_column_lineage(
187 sql: &str,
188 options: &OpenLineageOptions,
189) -> Result<OpenLineageColumnLineageResult> {
190 validate_common_options(options)?;
191
192 let mut warnings = Vec::new();
193 let schema_mapping = options
194 .schema
195 .as_ref()
196 .map(mapping_schema_from_validation_schema);
197 let dialect = Dialect::get(options.dialect);
198 let mut expressions = dialect.parse(sql)?;
199 if expressions.len() != 1 {
200 return Err(Error::parse(
201 format!(
202 "OpenLineage generation expects exactly one statement, found {}",
203 expressions.len()
204 ),
205 0,
206 0,
207 0,
208 0,
209 ));
210 }
211
212 let expr = expressions.remove(0);
213 let analysis = analyze_statement(&expr, options, &mut warnings)?;
214 let mut output_fields = output_fields_for_query(
215 &analysis.query,
216 schema_mapping.as_ref().map(|s| s as &dyn Schema),
217 options.dialect,
218 &mut warnings,
219 )?;
220 apply_output_column_names(
221 &mut output_fields,
222 &analysis.output_column_names,
223 &mut warnings,
224 );
225
226 let mut fields = BTreeMap::new();
227 for output_field in output_fields {
228 if fields.contains_key(&output_field.name) {
229 warnings.push(OpenLineageWarning::new(
230 "W_DUPLICATE_OUTPUT_FIELD",
231 format!(
232 "Duplicate output field '{}' was merged in the OpenLineage fields map",
233 output_field.name
234 ),
235 ));
236 }
237
238 let input_fields = input_fields_for_output(
239 &analysis.query,
240 &output_field,
241 options,
242 schema_mapping.as_ref().map(|s| s as &dyn Schema),
243 &mut warnings,
244 )?;
245
246 fields.insert(output_field.name, ColumnLineageField { input_fields });
247 }
248
249 let mut outputs = vec![OpenLineageDataset::from(analysis.output.clone())];
250 attach_output_facets(&mut outputs[0], &analysis.output, options, &fields)?;
251
252 Ok(OpenLineageColumnLineageResult {
253 facet: ColumnLineageDatasetFacet {
254 producer: options.producer.clone(),
255 schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
256 fields,
257 },
258 inputs: analysis
259 .inputs
260 .into_iter()
261 .map(OpenLineageDataset::from)
262 .collect(),
263 outputs,
264 warnings,
265 })
266}
267
268pub fn openlineage_job_event(
270 sql: &str,
271 options: &OpenLineageOptions,
272) -> Result<OpenLineageEventResult> {
273 let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
274 let job_name = required_option(&options.job_name, "jobName")?;
275 let event_time = required_option(&options.event_time, "eventTime")?;
276
277 let result = openlineage_column_lineage(sql, options)?;
278 let event = json!({
279 "eventTime": event_time,
280 "producer": options.producer,
281 "schemaURL": OPENLINEAGE_SCHEMA_URL,
282 "job": {
283 "namespace": job_namespace,
284 "name": job_name,
285 "facets": job_facets(sql, options),
286 },
287 "inputs": result.inputs,
288 "outputs": result.outputs,
289 });
290
291 Ok(OpenLineageEventResult {
292 event,
293 warnings: result.warnings,
294 })
295}
296
297pub fn openlineage_run_event(
299 sql: &str,
300 options: &OpenLineageOptions,
301) -> Result<OpenLineageEventResult> {
302 let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
303 let job_name = required_option(&options.job_name, "jobName")?;
304 let event_time = required_option(&options.event_time, "eventTime")?;
305 let run_id = required_option(&options.run_id, "runId")?;
306 let event_type = options
307 .event_type
308 .ok_or_else(|| Error::parse("Missing required option: eventType", 0, 0, 0, 0))?;
309
310 let result = openlineage_column_lineage(sql, options)?;
311 let event = json!({
312 "eventTime": event_time,
313 "eventType": event_type,
314 "producer": options.producer,
315 "schemaURL": OPENLINEAGE_SCHEMA_URL,
316 "run": {
317 "runId": run_id,
318 "facets": {},
319 },
320 "job": {
321 "namespace": job_namespace,
322 "name": job_name,
323 "facets": job_facets(sql, options),
324 },
325 "inputs": result.inputs,
326 "outputs": result.outputs,
327 });
328
329 Ok(OpenLineageEventResult {
330 event,
331 warnings: result.warnings,
332 })
333}
334
335fn validate_common_options(options: &OpenLineageOptions) -> Result<()> {
336 if options.producer.trim().is_empty() {
337 return Err(Error::parse(
338 "Missing required option: producer",
339 0,
340 0,
341 0,
342 0,
343 ));
344 }
345 Ok(())
346}
347
348fn required_option(value: &Option<String>, name: &str) -> Result<String> {
349 match value.as_ref().filter(|v| !v.trim().is_empty()) {
350 Some(value) => Ok(value.clone()),
351 None => Err(Error::parse(
352 format!("Missing required option: {name}"),
353 0,
354 0,
355 0,
356 0,
357 )),
358 }
359}
360
361fn analyze_statement(
362 expr: &Expression,
363 options: &OpenLineageOptions,
364 warnings: &mut Vec<OpenLineageWarning>,
365) -> Result<StatementAnalysis> {
366 match expr {
367 Expression::Prepare(prepare) => analyze_statement(&prepare.statement, options, warnings),
368 Expression::Select(select) => {
369 let output = if let Some(into) = &select.into {
370 dataset_from_expression(&into.this, options)?
371 } else {
372 options.output_dataset.clone().ok_or_else(|| {
373 Error::parse(
374 "OpenLineage outputDataset is required for SELECT statements without SELECT INTO",
375 0,
376 0,
377 0,
378 0,
379 )
380 })?
381 };
382 Ok(StatementAnalysis {
383 query: expr.clone(),
384 inputs: collect_input_datasets(expr, options, Some(&output), warnings)?,
385 output,
386 output_column_names: Vec::new(),
387 })
388 }
389 Expression::Insert(insert) => {
390 let output = dataset_from_table_ref(&insert.table, options)?;
391 let query = insert.query.clone().ok_or_else(|| {
392 Error::unsupported(
393 "OpenLineage column lineage for INSERT without query",
394 options.dialect.to_string(),
395 )
396 })?;
397 Ok(StatementAnalysis {
398 inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
399 query,
400 output,
401 output_column_names: insert.columns.iter().map(|col| col.name.clone()).collect(),
402 })
403 }
404 Expression::CreateTable(create) => {
405 let output = dataset_from_table_ref(&create.name, options)?;
406 let query = create.as_select.clone().ok_or_else(|| {
407 Error::unsupported(
408 "OpenLineage column lineage for CREATE TABLE without AS SELECT",
409 options.dialect.to_string(),
410 )
411 })?;
412 Ok(StatementAnalysis {
413 inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
414 query,
415 output,
416 output_column_names: create
417 .columns
418 .iter()
419 .map(|col| col.name.name.clone())
420 .collect(),
421 })
422 }
423 _ => Err(Error::unsupported(
424 format!("OpenLineage generation for {}", expr.variant_name()),
425 options.dialect.to_string(),
426 )),
427 }
428}
429
430fn output_fields_for_query(
431 query: &Expression,
432 schema: Option<&dyn Schema>,
433 dialect: DialectType,
434 warnings: &mut Vec<OpenLineageWarning>,
435) -> Result<Vec<OutputField>> {
436 let select = leftmost_select(query).ok_or_else(|| {
437 Error::unsupported(
438 "OpenLineage output field extraction for non-SELECT query",
439 dialect.to_string(),
440 )
441 })?;
442
443 let mut fields = Vec::new();
444 for (idx, expr) in select.expressions.iter().enumerate() {
445 if is_star_expr(expr) {
446 expand_star_output_fields(select, expr, schema, warnings, &mut fields);
447 continue;
448 }
449
450 let name = output_name(expr).unwrap_or_else(|| format!("_{idx}"));
451 fields.push(OutputField {
452 lineage_name: name.clone(),
453 name,
454 expression: Some(expr.clone()),
455 star_source_table: None,
456 });
457 }
458 Ok(fields)
459}
460
461fn apply_output_column_names(
462 fields: &mut [OutputField],
463 output_column_names: &[String],
464 warnings: &mut Vec<OpenLineageWarning>,
465) {
466 if output_column_names.is_empty() {
467 return;
468 }
469 if output_column_names.len() != fields.len() {
470 warnings.push(OpenLineageWarning::new(
471 "W_OUTPUT_COLUMN_COUNT_MISMATCH",
472 format!(
473 "Target column count ({}) does not match projected column count ({})",
474 output_column_names.len(),
475 fields.len()
476 ),
477 ));
478 return;
479 }
480 for (field, output_name) in fields.iter_mut().zip(output_column_names) {
481 field.name = output_name.clone();
482 }
483}
484
485fn input_fields_for_output(
486 query: &Expression,
487 output_field: &OutputField,
488 options: &OpenLineageOptions,
489 schema: Option<&dyn Schema>,
490 warnings: &mut Vec<OpenLineageWarning>,
491) -> Result<Vec<OpenLineageInputField>> {
492 if let Some(table) = &output_field.star_source_table {
493 return terminal_fields_to_openlineage(
494 vec![TerminalField {
495 table: table.clone(),
496 field: output_field.lineage_name.clone(),
497 }],
498 "IDENTITY",
499 Some(format!("SELECT {}", output_field.lineage_name)),
500 options,
501 warnings,
502 );
503 }
504
505 let lineage_result = if let Some(schema) = schema {
506 lineage::lineage_with_schema(
507 &output_field.lineage_name,
508 query,
509 Some(schema),
510 Some(options.dialect),
511 false,
512 )
513 } else {
514 lineage::lineage(
515 &output_field.lineage_name,
516 query,
517 Some(options.dialect),
518 false,
519 )
520 };
521
522 let node = match lineage_result {
523 Ok(node) => node,
524 Err(err) => {
525 warnings.push(OpenLineageWarning::new(
526 "W_UNRESOLVED_OUTPUT_FIELD",
527 format!(
528 "Could not resolve lineage for output field '{}': {}",
529 output_field.name, err
530 ),
531 ));
532 return Ok(Vec::new());
533 }
534 };
535
536 let mut terminals = BTreeSet::new();
537 collect_terminal_fields(&node, &mut terminals);
538 let terminals: Vec<TerminalField> = terminals.into_iter().collect();
539
540 if terminals.is_empty() {
541 if has_virtual_terminal(&node) {
542 return Ok(Vec::new());
543 }
544 warnings.push(OpenLineageWarning::new(
545 "W_EMPTY_FIELD_LINEAGE",
546 format!(
547 "No input fields were found for output field '{}'",
548 output_field.name
549 ),
550 ));
551 return Ok(Vec::new());
552 }
553
554 let subtype = transformation_subtype(output_field.expression.as_ref(), &terminals);
555 let description = output_field
556 .expression
557 .as_ref()
558 .and_then(|expr| transformation_description(expr, options.dialect));
559
560 terminal_fields_to_openlineage(terminals, subtype, description, options, warnings)
561}
562
563fn transformation_description(expr: &Expression, dialect: DialectType) -> Option<String> {
564 #[cfg(feature = "generate")]
565 {
566 Some(expr.sql_for(dialect))
567 }
568
569 #[cfg(not(feature = "generate"))]
570 {
571 let _ = (expr, dialect);
572 None
573 }
574}
575
576fn terminal_fields_to_openlineage(
577 terminals: Vec<TerminalField>,
578 subtype: &str,
579 description: Option<String>,
580 options: &OpenLineageOptions,
581 warnings: &mut Vec<OpenLineageWarning>,
582) -> Result<Vec<OpenLineageInputField>> {
583 let mut result = Vec::new();
584 for terminal in terminals {
585 let dataset = dataset_from_table_name(&terminal.table, options).map_err(|err| {
586 warnings.push(OpenLineageWarning::new(
587 "W_UNRESOLVED_DATASET",
588 format!(
589 "Could not map table '{}' to an OpenLineage dataset: {}",
590 terminal.table, err
591 ),
592 ));
593 err
594 })?;
595 result.push(OpenLineageInputField {
596 namespace: dataset.namespace,
597 name: dataset.name,
598 field: terminal.field,
599 transformations: vec![OpenLineageTransformation {
600 type_: "DIRECT".to_string(),
601 subtype: subtype.to_string(),
602 description: description.clone(),
603 masking: Some(false),
604 }],
605 });
606 }
607 Ok(result)
608}
609
610fn transformation_subtype(expr: Option<&Expression>, terminals: &[TerminalField]) -> &'static str {
611 let Some(expr) = expr else {
612 return "TRANSFORMATION";
613 };
614 let unaliased = unalias(expr);
615 if expression_contains_aggregate(unaliased) {
616 return "AGGREGATION";
617 }
618 if terminals.len() == 1 {
619 if let Expression::Column(col) = unaliased {
620 if col.name.name == terminals[0].field {
621 return "IDENTITY";
622 }
623 }
624 }
625 "TRANSFORMATION"
626}
627
628fn collect_terminal_fields(node: &LineageNode, terminals: &mut BTreeSet<TerminalField>) {
629 if node.downstream.is_empty() {
630 if node.source_kind == SourceKind::Virtual {
631 return;
632 }
633 if let Expression::Column(column) = &node.expression {
634 let table = if !node.source_name.is_empty() {
635 Some(node.source_name.clone())
636 } else if let Expression::Table(table) = &node.source {
637 Some(table_ref_qualified_name(table))
638 } else {
639 column.table.as_ref().map(|t| t.name.clone())
640 };
641 if let Some(table) = table.filter(|t| !t.is_empty()) {
642 terminals.insert(TerminalField {
643 table,
644 field: column.name.name.clone(),
645 });
646 }
647 }
648 return;
649 }
650
651 for child in &node.downstream {
652 collect_terminal_fields(child, terminals);
653 }
654}
655
656fn has_virtual_terminal(node: &LineageNode) -> bool {
657 if node.downstream.is_empty() {
658 return node.source_kind == SourceKind::Virtual;
659 }
660 node.downstream.iter().any(has_virtual_terminal)
661}
662
663fn expression_contains_aggregate(expr: &Expression) -> bool {
664 expr.contains(|node| {
665 matches!(
666 node,
667 Expression::AggregateFunction(_)
668 | Expression::Sum(_)
669 | Expression::Count(_)
670 | Expression::Avg(_)
671 | Expression::Min(_)
672 | Expression::Max(_)
673 | Expression::GroupConcat(_)
674 | Expression::StringAgg(_)
675 | Expression::ListAgg(_)
676 | Expression::ArrayAgg(_)
677 | Expression::CountIf(_)
678 | Expression::SumIf(_)
679 | Expression::Stddev(_)
680 | Expression::StddevPop(_)
681 | Expression::StddevSamp(_)
682 | Expression::Variance(_)
683 | Expression::VarPop(_)
684 | Expression::VarSamp(_)
685 | Expression::Median(_)
686 | Expression::Mode(_)
687 | Expression::First(_)
688 | Expression::Last(_)
689 | Expression::AnyValue(_)
690 | Expression::ApproxDistinct(_)
691 | Expression::ApproxCountDistinct(_)
692 | Expression::ApproxPercentile(_)
693 | Expression::Percentile(_)
694 | Expression::LogicalAnd(_)
695 | Expression::LogicalOr(_)
696 | Expression::Skewness(_)
697 | Expression::BitwiseCount(_)
698 | Expression::ArrayConcatAgg(_)
699 | Expression::ArrayUniqueAgg(_)
700 | Expression::BoolXorAgg(_)
701 | Expression::ParameterizedAgg(_)
702 | Expression::ArgMax(_)
703 | Expression::ArgMin(_)
704 | Expression::ApproxTopK(_)
705 | Expression::ApproxTopKAccumulate(_)
706 | Expression::ApproxTopKCombine(_)
707 | Expression::ApproxTopKEstimate(_)
708 | Expression::ApproxTopSum(_)
709 | Expression::ApproxQuantiles(_)
710 | Expression::Grouping(_)
711 | Expression::GroupingId(_)
712 | Expression::AnonymousAggFunc(_)
713 | Expression::CombinedAggFunc(_)
714 | Expression::CombinedParameterizedAgg(_)
715 | Expression::HashAgg(_)
716 | Expression::ObjectAgg(_)
717 | Expression::AIAgg(_)
718 )
719 })
720}
721
722fn collect_input_datasets(
723 expr: &Expression,
724 options: &OpenLineageOptions,
725 output: Option<&OpenLineageDatasetId>,
726 warnings: &mut Vec<OpenLineageWarning>,
727) -> Result<Vec<OpenLineageDatasetId>> {
728 let cte_aliases = collect_cte_aliases(expr, options.dialect);
729 let mut seen = BTreeSet::new();
730 let mut result = Vec::new();
731
732 for table in expr.dfs().filter_map(|node| match node {
733 Expression::Table(table) => Some(table),
734 _ => None,
735 }) {
736 let qname = table_ref_qualified_name(table);
737 let normalized = normalize_identifier(&table.name.name, options.dialect, true);
738 if cte_aliases.contains(&normalized) {
739 continue;
740 }
741 if output
742 .map(|out| out.name == qname || out.name == table.name.name)
743 .unwrap_or(false)
744 {
745 continue;
746 }
747 match dataset_from_table_name(&qname, options) {
748 Ok(dataset) => {
749 if seen.insert((dataset.namespace.clone(), dataset.name.clone())) {
750 result.push(dataset);
751 }
752 }
753 Err(err) => warnings.push(OpenLineageWarning::new(
754 "W_UNRESOLVED_DATASET",
755 format!("Could not map input table '{qname}': {err}"),
756 )),
757 }
758 }
759
760 Ok(result)
761}
762
763fn attach_output_facets(
764 output: &mut OpenLineageDataset,
765 output_id: &OpenLineageDatasetId,
766 options: &OpenLineageOptions,
767 fields: &BTreeMap<String, ColumnLineageField>,
768) -> Result<()> {
769 let column_lineage = ColumnLineageDatasetFacet {
770 producer: options.producer.clone(),
771 schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
772 fields: fields.clone(),
773 };
774 output.facets.insert(
775 "columnLineage".to_string(),
776 serde_json::to_value(column_lineage).map_err(openlineage_serialization_error)?,
777 );
778
779 if let Some(schema_facet) = schema_facet_for_dataset(output_id, options) {
780 output.facets.insert(
781 "schema".to_string(),
782 serde_json::to_value(schema_facet).map_err(openlineage_serialization_error)?,
783 );
784 }
785
786 Ok(())
787}
788
789fn job_facets(sql: &str, options: &OpenLineageOptions) -> Value {
790 json!({
791 "sql": {
792 "_producer": options.producer,
793 "_schemaURL": SQL_JOB_FACET_SCHEMA_URL,
794 "query": sql,
795 "dialect": options.dialect.to_string(),
796 },
797 "jobType": {
798 "_producer": options.producer,
799 "_schemaURL": JOB_TYPE_JOB_FACET_SCHEMA_URL,
800 "processingType": "BATCH",
801 "integration": "POLYGLOT_SQL",
802 "jobType": "QUERY",
803 }
804 })
805}
806
807#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
808struct SchemaDatasetFacet {
809 #[serde(rename = "_producer")]
810 producer: String,
811 #[serde(rename = "_schemaURL")]
812 schema_url: String,
813 fields: Vec<SchemaDatasetFacetField>,
814}
815
816#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
817struct SchemaDatasetFacetField {
818 name: String,
819 #[serde(skip_serializing_if = "String::is_empty", default)]
820 #[serde(rename = "type")]
821 data_type: String,
822 #[serde(skip_serializing_if = "Option::is_none")]
823 ordinal_position: Option<usize>,
824}
825
826fn schema_facet_for_dataset(
827 output: &OpenLineageDatasetId,
828 options: &OpenLineageOptions,
829) -> Option<SchemaDatasetFacet> {
830 let schema = options.schema.as_ref()?;
831 let table = schema.tables.iter().find(|table| {
832 let qname = if let Some(schema_name) = &table.schema {
833 format!("{}.{}", schema_name, table.name)
834 } else {
835 table.name.clone()
836 };
837 output.name == table.name || output.name == qname
838 })?;
839
840 Some(SchemaDatasetFacet {
841 producer: options.producer.clone(),
842 schema_url: SCHEMA_DATASET_FACET_SCHEMA_URL.to_string(),
843 fields: table
844 .columns
845 .iter()
846 .enumerate()
847 .map(|(idx, col)| SchemaDatasetFacetField {
848 name: col.name.clone(),
849 data_type: col.data_type.clone(),
850 ordinal_position: Some(idx + 1),
851 })
852 .collect(),
853 })
854}
855
856fn expand_star_output_fields(
857 select: &Select,
858 star_expr: &Expression,
859 schema: Option<&dyn Schema>,
860 warnings: &mut Vec<OpenLineageWarning>,
861 fields: &mut Vec<OutputField>,
862) {
863 let Some(schema) = schema else {
864 warnings.push(OpenLineageWarning::new(
865 "W_STAR_WITHOUT_SCHEMA",
866 "SELECT * cannot be expanded into OpenLineage column lineage without schema metadata",
867 ));
868 return;
869 };
870
871 let qualifier = star_qualifier(star_expr);
872 let sources = select_source_tables(select);
873 for (alias, qname) in sources {
874 if qualifier
875 .as_ref()
876 .map(|q| q != &alias && q != &qname)
877 .unwrap_or(false)
878 {
879 continue;
880 }
881 match schema.column_names(&qname) {
882 Ok(columns) => {
883 for name in columns {
884 fields.push(OutputField {
885 lineage_name: name.clone(),
886 name,
887 expression: None,
888 star_source_table: Some(qname.clone()),
889 });
890 }
891 }
892 Err(err) => warnings.push(OpenLineageWarning::new(
893 "W_STAR_SCHEMA_LOOKUP_FAILED",
894 format!("Could not expand SELECT * for table '{}': {}", qname, err),
895 )),
896 }
897 }
898}
899
900fn select_source_tables(select: &Select) -> Vec<(String, String)> {
901 let mut result = Vec::new();
902 if let Some(from) = &select.from {
903 for expr in &from.expressions {
904 collect_source_table(expr, &mut result);
905 }
906 }
907 for join in &select.joins {
908 collect_source_table(&join.this, &mut result);
909 }
910 result
911}
912
913fn collect_source_table(expr: &Expression, result: &mut Vec<(String, String)>) {
914 match expr {
915 Expression::Table(table) => {
916 let qname = table_ref_qualified_name(table);
917 let alias = table
918 .alias
919 .as_ref()
920 .map(|a| a.name.clone())
921 .unwrap_or_else(|| table.name.name.clone());
922 result.push((alias, qname));
923 }
924 Expression::Alias(alias) => collect_source_table(&alias.this, result),
925 Expression::Paren(paren) => collect_source_table(&paren.this, result),
926 _ => {}
927 }
928}
929
930fn leftmost_select(expr: &Expression) -> Option<&Select> {
931 match expr {
932 Expression::Prepare(prepare) => leftmost_select(&prepare.statement),
933 Expression::Select(select) => Some(select),
934 Expression::Union(union) => leftmost_select(&union.left),
935 Expression::Intersect(intersect) => leftmost_select(&intersect.left),
936 Expression::Except(except) => leftmost_select(&except.left),
937 Expression::Subquery(subquery) => leftmost_select(&subquery.this),
938 _ => None,
939 }
940}
941
942fn output_name(expr: &Expression) -> Option<String> {
943 match expr {
944 Expression::Alias(alias) => Some(alias.alias.name.clone()),
945 Expression::Column(col) => Some(col.name.name.clone()),
946 Expression::Identifier(id) => Some(id.name.clone()),
947 Expression::Annotated(a) => output_name(&a.this),
948 _ => None,
949 }
950}
951
952fn unalias(expr: &Expression) -> &Expression {
953 match expr {
954 Expression::Alias(alias) => &alias.this,
955 Expression::Annotated(a) => unalias(&a.this),
956 _ => expr,
957 }
958}
959
960fn is_star_expr(expr: &Expression) -> bool {
961 matches!(expr, Expression::Star(_))
962 || matches!(expr, Expression::Column(col) if col.name.name == "*")
963}
964
965fn star_qualifier(expr: &Expression) -> Option<String> {
966 match expr {
967 Expression::Star(star) => star.table.as_ref().map(|t| t.name.clone()),
968 Expression::Column(col) if col.name.name == "*" => {
969 col.table.as_ref().map(|t| t.name.clone())
970 }
971 _ => None,
972 }
973}
974
975fn dataset_from_expression(
976 expr: &Expression,
977 options: &OpenLineageOptions,
978) -> Result<OpenLineageDatasetId> {
979 match expr {
980 Expression::Table(table) => dataset_from_table_ref(table, options),
981 Expression::Identifier(id) => dataset_from_table_name(&id.name, options),
982 _ => Err(Error::unsupported(
983 "OpenLineage dataset extraction from non-table expression",
984 options.dialect.to_string(),
985 )),
986 }
987}
988
989fn dataset_from_table_ref(
990 table: &TableRef,
991 options: &OpenLineageOptions,
992) -> Result<OpenLineageDatasetId> {
993 dataset_from_table_name(&table_ref_qualified_name(table), options)
994}
995
996fn dataset_from_table_name(
997 table_name: &str,
998 options: &OpenLineageOptions,
999) -> Result<OpenLineageDatasetId> {
1000 if let Some(mapped) = options.dataset_mappings.get(table_name) {
1001 return Ok(mapped.clone());
1002 }
1003 let namespace = options.dataset_namespace.as_ref().ok_or_else(|| {
1004 Error::parse(
1005 format!(
1006 "Missing datasetNamespace or explicit dataset mapping for table '{}'",
1007 table_name
1008 ),
1009 0,
1010 0,
1011 0,
1012 0,
1013 )
1014 })?;
1015 Ok(OpenLineageDatasetId::new(namespace, table_name))
1016}
1017
1018fn table_ref_qualified_name(table: &TableRef) -> String {
1019 let mut parts = Vec::new();
1020 if let Some(catalog) = &table.catalog {
1021 parts.push(catalog.name.clone());
1022 }
1023 if let Some(schema) = &table.schema {
1024 parts.push(schema.name.clone());
1025 }
1026 parts.push(table.name.name.clone());
1027 parts.join(".")
1028}
1029
1030fn collect_cte_aliases(expr: &Expression, dialect: DialectType) -> HashSet<String> {
1031 let mut aliases = HashSet::new();
1032 for node in expr.dfs() {
1033 match node {
1034 Expression::Select(select) => {
1035 if let Some(with) = &select.with {
1036 collect_with_aliases(with, dialect, &mut aliases);
1037 }
1038 }
1039 Expression::Union(union) => {
1040 if let Some(with) = &union.with {
1041 collect_with_aliases(with, dialect, &mut aliases);
1042 }
1043 }
1044 Expression::Intersect(intersect) => {
1045 if let Some(with) = &intersect.with {
1046 collect_with_aliases(with, dialect, &mut aliases);
1047 }
1048 }
1049 Expression::Except(except) => {
1050 if let Some(with) = &except.with {
1051 collect_with_aliases(with, dialect, &mut aliases);
1052 }
1053 }
1054 _ => {}
1055 }
1056 }
1057 aliases
1058}
1059
1060fn collect_with_aliases(with: &With, dialect: DialectType, aliases: &mut HashSet<String>) {
1061 for cte in &with.ctes {
1062 aliases.insert(normalize_identifier(&cte.alias.name, dialect, true));
1063 }
1064}
1065
1066fn normalize_identifier(name: &str, dialect: DialectType, is_table: bool) -> String {
1067 crate::schema::normalize_name(name, Some(dialect), is_table, true)
1068}
1069
1070fn openlineage_serialization_error(err: serde_json::Error) -> Error {
1071 Error::internal(format!("OpenLineage serialization failed: {err}"))
1072}
1073
1074fn deserialize_dialect_type<'de, D>(deserializer: D) -> std::result::Result<DialectType, D::Error>
1075where
1076 D: Deserializer<'de>,
1077{
1078 let value = String::deserialize(deserializer)?;
1079 value.parse::<DialectType>().map_err(de::Error::custom)
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084 use super::*;
1085
1086 fn options() -> OpenLineageOptions {
1087 OpenLineageOptions {
1088 dialect: DialectType::PostgreSQL,
1089 producer: "https://github.com/tobilg/polyglot".to_string(),
1090 dataset_namespace: Some("postgres://warehouse".to_string()),
1091 output_dataset: Some(OpenLineageDatasetId::new(
1092 "postgres://warehouse",
1093 "analytics.out",
1094 )),
1095 job_namespace: Some("polyglot-tests".to_string()),
1096 job_name: Some("lineage-test".to_string()),
1097 event_time: Some("2026-05-18T00:00:00Z".to_string()),
1098 run_id: Some("3b452093-782c-4ef2-9c0c-aafe2aa6f34d".to_string()),
1099 event_type: Some(OpenLineageRunEventType::Complete),
1100 ..Default::default()
1101 }
1102 }
1103
1104 #[test]
1105 fn deserializes_dialect_aliases_in_options() {
1106 let options: OpenLineageOptions =
1107 serde_json::from_str(r#"{"producer":"polyglot","dialect":"postgres"}"#)
1108 .expect("options");
1109 assert_eq!(options.dialect, DialectType::PostgreSQL);
1110 }
1111
1112 #[test]
1113 fn emits_identity_column_lineage_for_select() {
1114 let result = openlineage_column_lineage("SELECT a FROM t", &options()).expect("lineage");
1115 let field = result.facet.fields.get("a").expect("field a");
1116 assert_eq!(field.input_fields.len(), 1);
1117 assert_eq!(field.input_fields[0].name, "t");
1118 assert_eq!(field.input_fields[0].field, "a");
1119 assert_eq!(field.input_fields[0].transformations[0].subtype, "IDENTITY");
1120 }
1121
1122 #[test]
1123 fn emits_column_lineage_for_prepared_statement_body() {
1124 let result = openlineage_column_lineage(
1125 "PREPARE leak AS SELECT id FROM sensitive_table WHERE id = $1",
1126 &options(),
1127 )
1128 .expect("lineage");
1129 let field = result.facet.fields.get("id").expect("field id");
1130 assert_eq!(field.input_fields.len(), 1);
1131 assert_eq!(field.input_fields[0].name, "sensitive_table");
1132 assert_eq!(field.input_fields[0].field, "id");
1133 }
1134
1135 #[test]
1136 fn resolves_input_dataset_behind_table_alias() {
1137 let result = openlineage_column_lineage("SELECT o.total FROM orders o", &options())
1138 .expect("lineage");
1139 let field = result.facet.fields.get("total").expect("field total");
1140 assert_eq!(field.input_fields[0].name, "orders");
1141 assert_eq!(field.input_fields[0].field, "total");
1142 }
1143
1144 #[test]
1145 fn emits_transformation_column_lineage_for_expression() {
1146 let result =
1147 openlineage_column_lineage("SELECT a + b AS c FROM t", &options()).expect("lineage");
1148 let field = result.facet.fields.get("c").expect("field c");
1149 assert_eq!(field.input_fields.len(), 2);
1150 assert!(field.input_fields.iter().any(|f| f.field == "a"));
1151 assert!(field.input_fields.iter().any(|f| f.field == "b"));
1152 assert!(field
1153 .input_fields
1154 .iter()
1155 .all(|f| f.transformations[0].subtype == "TRANSFORMATION"));
1156 }
1157
1158 #[test]
1159 fn omits_bigquery_safe_namespace_from_column_lineage_issue207() {
1160 let mut opts = options();
1161 opts.dialect = DialectType::BigQuery;
1162
1163 let result = openlineage_column_lineage(
1164 r#"
1165WITH import_cte AS (
1166 SELECT timestamp, data, operation
1167 FROM `project`.`dataset`.`source_table`
1168),
1169transform_cte AS (
1170 SELECT
1171 timestamp,
1172 SAFE.PARSE_JSON(data) AS json_data
1173 FROM import_cte
1174)
1175SELECT json_data FROM transform_cte
1176"#,
1177 &opts,
1178 )
1179 .expect("lineage");
1180 let field = result.facet.fields.get("json_data").expect("json_data");
1181
1182 assert!(
1183 field.input_fields.iter().any(|input| input.field == "data"),
1184 "expected data input field, got {:?}",
1185 field.input_fields
1186 );
1187 assert!(
1188 !field
1189 .input_fields
1190 .iter()
1191 .any(|input| input.field.eq_ignore_ascii_case("safe")),
1192 "did not expect SAFE namespace as input field, got {:?}",
1193 field.input_fields
1194 );
1195 }
1196
1197 #[test]
1198 fn emits_bigquery_unnest_alias_column_lineage_issue209() {
1199 let mut opts = options();
1200 opts.dialect = DialectType::BigQuery;
1201 opts.dataset_namespace = Some("bigquery://warehouse".to_string());
1202 opts.output_dataset = Some(OpenLineageDatasetId::new(
1203 "bigquery://warehouse",
1204 "calendar",
1205 ));
1206
1207 let result = openlineage_column_lineage(
1208 r#"
1209SELECT date_val AS week_start
1210FROM UNNEST(GENERATE_DATE_ARRAY('2024-01-01', '2024-12-31', INTERVAL 1 WEEK)) AS date_val
1211"#,
1212 &opts,
1213 )
1214 .expect("lineage");
1215 let field = result.facet.fields.get("week_start").expect("week_start");
1216
1217 assert!(field.input_fields.is_empty());
1218 assert!(
1219 result
1220 .warnings
1221 .iter()
1222 .all(|warning| warning.code != "W_EMPTY_FIELD_LINEAGE"),
1223 "did not expect empty-lineage warning, got {:?}",
1224 result.warnings
1225 );
1226 }
1227
1228 #[test]
1229 fn emits_bigquery_table_backed_unnest_column_lineage() {
1230 let mut opts = options();
1231 opts.dialect = DialectType::BigQuery;
1232 opts.dataset_namespace = Some("bigquery://warehouse".to_string());
1233 opts.output_dataset = Some(OpenLineageDatasetId::new("bigquery://warehouse", "items"));
1234
1235 let result = openlineage_column_lineage(
1236 r#"
1237SELECT item.item AS item
1238FROM t JOIN UNNEST(t.items) AS item ON TRUE
1239"#,
1240 &opts,
1241 )
1242 .expect("lineage");
1243 let field = result.facet.fields.get("item").expect("item");
1244
1245 assert_eq!(field.input_fields.len(), 1);
1246 assert_eq!(field.input_fields[0].name, "t");
1247 assert_eq!(field.input_fields[0].field, "items");
1248 }
1249
1250 #[test]
1251 fn emits_aggregation_column_lineage() {
1252 let result =
1253 openlineage_column_lineage("SELECT SUM(amount) AS total FROM orders", &options())
1254 .expect("lineage");
1255 let field = result.facet.fields.get("total").expect("field total");
1256 assert_eq!(field.input_fields[0].field, "amount");
1257 assert_eq!(
1258 field.input_fields[0].transformations[0].subtype,
1259 "AGGREGATION"
1260 );
1261 }
1262
1263 #[test]
1264 fn infers_insert_output_dataset() {
1265 let mut opts = options();
1266 opts.output_dataset = None;
1267 let result =
1268 openlineage_column_lineage("INSERT INTO analytics.out SELECT a FROM raw.input", &opts)
1269 .expect("lineage");
1270 assert_eq!(result.outputs[0].name, "analytics.out");
1271 assert_eq!(result.inputs[0].name, "raw.input");
1272 }
1273
1274 #[test]
1275 fn maps_insert_target_columns_to_output_fields() {
1276 let mut opts = options();
1277 opts.output_dataset = None;
1278 let result = openlineage_column_lineage(
1279 "INSERT INTO analytics.out (target_a) SELECT source_a FROM raw.input",
1280 &opts,
1281 )
1282 .expect("lineage");
1283 let field = result.facet.fields.get("target_a").expect("target field");
1284 assert_eq!(field.input_fields[0].field, "source_a");
1285 assert!(!result.facet.fields.contains_key("source_a"));
1286 }
1287
1288 #[test]
1289 fn pure_select_requires_output_dataset() {
1290 let mut opts = options();
1291 opts.output_dataset = None;
1292 let err = openlineage_column_lineage("SELECT a FROM t", &opts).unwrap_err();
1293 assert!(err.to_string().contains("outputDataset is required"));
1294 }
1295
1296 #[test]
1297 fn emits_job_event_payload() {
1298 let result = openlineage_job_event("SELECT a FROM t", &options()).expect("event");
1299 assert_eq!(result.event["job"]["namespace"], "polyglot-tests");
1300 assert_eq!(
1301 result.event["job"]["facets"]["sql"]["_schemaURL"],
1302 SQL_JOB_FACET_SCHEMA_URL
1303 );
1304 assert_eq!(
1305 result.event["outputs"][0]["facets"]["columnLineage"]["fields"]["a"]["inputFields"][0]
1306 ["field"],
1307 "a"
1308 );
1309 }
1310
1311 #[test]
1312 fn emits_run_event_payload() {
1313 let result = openlineage_run_event("SELECT a FROM t", &options()).expect("event");
1314 assert_eq!(result.event["eventType"], "COMPLETE");
1315 assert_eq!(
1316 result.event["run"]["runId"],
1317 "3b452093-782c-4ef2-9c0c-aafe2aa6f34d"
1318 );
1319 }
1320
1321 #[test]
1322 fn select_star_without_schema_warns() {
1323 let result = openlineage_column_lineage("SELECT * FROM t", &options()).expect("lineage");
1324 assert!(result.facet.fields.is_empty());
1325 assert!(result
1326 .warnings
1327 .iter()
1328 .any(|w| w.code == "W_STAR_WITHOUT_SCHEMA"));
1329 }
1330
1331 #[test]
1332 fn select_star_with_schema_expands_fields() {
1333 let mut opts = options();
1334 opts.schema = Some(ValidationSchema {
1335 strict: None,
1336 tables: vec![crate::validation::SchemaTable {
1337 name: "t".to_string(),
1338 schema: None,
1339 columns: vec![
1340 crate::validation::SchemaColumn {
1341 name: "a".to_string(),
1342 data_type: "INT".to_string(),
1343 nullable: None,
1344 primary_key: false,
1345 unique: false,
1346 references: None,
1347 },
1348 crate::validation::SchemaColumn {
1349 name: "b".to_string(),
1350 data_type: "TEXT".to_string(),
1351 nullable: None,
1352 primary_key: false,
1353 unique: false,
1354 references: None,
1355 },
1356 ],
1357 aliases: vec![],
1358 primary_key: vec![],
1359 unique_keys: vec![],
1360 foreign_keys: vec![],
1361 }],
1362 });
1363
1364 let result = openlineage_column_lineage("SELECT * FROM t", &opts).expect("lineage");
1365 assert!(result.facet.fields.contains_key("a"));
1366 assert!(result.facet.fields.contains_key("b"));
1367 }
1368}