1use crate::dialects::{Dialect, DialectType};
8use crate::expressions::*;
9use crate::lineage::{self, LineageNode};
10use crate::schema::Schema;
11use crate::traversal::ExpressionWalk;
12use crate::{mapping_schema_from_validation_schema, Error, Result, ValidationSchema};
13use serde::de::{self, Deserializer};
14use serde::{Deserialize, Serialize};
15use serde_json::{json, Value};
16use std::collections::{BTreeMap, BTreeSet, HashSet};
17
18pub const OPENLINEAGE_SCHEMA_URL: &str = "https://openlineage.io/spec/2-0-2/OpenLineage.json";
19pub const COLUMN_LINEAGE_FACET_SCHEMA_URL: &str =
20 "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json";
21pub const SQL_JOB_FACET_SCHEMA_URL: &str =
22 "https://openlineage.io/spec/facets/1-1-0/SQLJobFacet.json";
23pub const JOB_TYPE_JOB_FACET_SCHEMA_URL: &str =
24 "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json";
25pub const SCHEMA_DATASET_FACET_SCHEMA_URL: &str =
26 "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json";
27
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct OpenLineageDatasetId {
32 pub namespace: String,
33 pub name: String,
34}
35
36impl OpenLineageDatasetId {
37 pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
38 Self {
39 namespace: namespace.into(),
40 name: name.into(),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, Default)]
47#[serde(rename_all = "camelCase", default)]
48pub struct OpenLineageOptions {
49 #[serde(deserialize_with = "deserialize_dialect_type")]
50 pub dialect: DialectType,
51 pub producer: String,
52 pub dataset_namespace: Option<String>,
53 pub dataset_mappings: BTreeMap<String, OpenLineageDatasetId>,
54 pub output_dataset: Option<OpenLineageDatasetId>,
55 pub schema: Option<ValidationSchema>,
56 pub job_namespace: Option<String>,
57 pub job_name: Option<String>,
58 pub event_time: Option<String>,
59 pub run_id: Option<String>,
60 pub event_type: Option<OpenLineageRunEventType>,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
66pub enum OpenLineageRunEventType {
67 Start,
68 Running,
69 Complete,
70 Abort,
71 Fail,
72 Other,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "camelCase")]
78pub struct OpenLineageWarning {
79 pub code: String,
80 pub message: String,
81}
82
83impl OpenLineageWarning {
84 fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
85 Self {
86 code: code.into(),
87 message: message.into(),
88 }
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93#[serde(rename_all = "camelCase")]
94pub struct OpenLineageColumnLineageResult {
95 pub facet: ColumnLineageDatasetFacet,
96 pub inputs: Vec<OpenLineageDataset>,
97 pub outputs: Vec<OpenLineageDataset>,
98 pub warnings: Vec<OpenLineageWarning>,
99}
100
101#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
102#[serde(rename_all = "camelCase")]
103pub struct OpenLineageEventResult {
104 pub event: Value,
105 pub warnings: Vec<OpenLineageWarning>,
106}
107
108#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
109pub struct OpenLineageDataset {
110 pub namespace: String,
111 pub name: String,
112 #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
113 pub facets: BTreeMap<String, Value>,
114}
115
116impl std::convert::From<OpenLineageDatasetId> for OpenLineageDataset {
117 fn from(id: OpenLineageDatasetId) -> Self {
118 Self {
119 namespace: id.namespace,
120 name: id.name,
121 facets: BTreeMap::new(),
122 }
123 }
124}
125
126#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127pub struct ColumnLineageDatasetFacet {
128 #[serde(rename = "_producer")]
129 pub producer: String,
130 #[serde(rename = "_schemaURL")]
131 pub schema_url: String,
132 pub fields: BTreeMap<String, ColumnLineageField>,
133}
134
135#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct ColumnLineageField {
138 pub input_fields: Vec<OpenLineageInputField>,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
142#[serde(rename_all = "camelCase")]
143pub struct OpenLineageInputField {
144 pub namespace: String,
145 pub name: String,
146 pub field: String,
147 #[serde(skip_serializing_if = "Vec::is_empty", default)]
148 pub transformations: Vec<OpenLineageTransformation>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
152pub struct OpenLineageTransformation {
153 #[serde(rename = "type")]
154 pub type_: String,
155 pub subtype: String,
156 #[serde(skip_serializing_if = "Option::is_none")]
157 pub description: Option<String>,
158 #[serde(skip_serializing_if = "Option::is_none")]
159 pub masking: Option<bool>,
160}
161
162#[derive(Debug, Clone)]
163struct StatementAnalysis {
164 query: Expression,
165 inputs: Vec<OpenLineageDatasetId>,
166 output: OpenLineageDatasetId,
167 output_column_names: Vec<String>,
168}
169
170#[derive(Debug, Clone)]
171struct OutputField {
172 name: String,
173 lineage_name: String,
174 expression: Option<Expression>,
175 star_source_table: Option<String>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
179struct TerminalField {
180 table: String,
181 field: String,
182}
183
184pub fn openlineage_column_lineage(
186 sql: &str,
187 options: &OpenLineageOptions,
188) -> Result<OpenLineageColumnLineageResult> {
189 validate_common_options(options)?;
190
191 let mut warnings = Vec::new();
192 let schema_mapping = options
193 .schema
194 .as_ref()
195 .map(mapping_schema_from_validation_schema);
196 let dialect = Dialect::get(options.dialect);
197 let mut expressions = dialect.parse(sql)?;
198 if expressions.len() != 1 {
199 return Err(Error::parse(
200 format!(
201 "OpenLineage generation expects exactly one statement, found {}",
202 expressions.len()
203 ),
204 0,
205 0,
206 0,
207 0,
208 ));
209 }
210
211 let expr = expressions.remove(0);
212 let analysis = analyze_statement(&expr, options, &mut warnings)?;
213 let mut output_fields = output_fields_for_query(
214 &analysis.query,
215 schema_mapping.as_ref().map(|s| s as &dyn Schema),
216 options.dialect,
217 &mut warnings,
218 )?;
219 apply_output_column_names(
220 &mut output_fields,
221 &analysis.output_column_names,
222 &mut warnings,
223 );
224
225 let mut fields = BTreeMap::new();
226 for output_field in output_fields {
227 if fields.contains_key(&output_field.name) {
228 warnings.push(OpenLineageWarning::new(
229 "W_DUPLICATE_OUTPUT_FIELD",
230 format!(
231 "Duplicate output field '{}' was merged in the OpenLineage fields map",
232 output_field.name
233 ),
234 ));
235 }
236
237 let input_fields = input_fields_for_output(
238 &analysis.query,
239 &output_field,
240 options,
241 schema_mapping.as_ref().map(|s| s as &dyn Schema),
242 &mut warnings,
243 )?;
244
245 fields.insert(output_field.name, ColumnLineageField { input_fields });
246 }
247
248 let mut outputs = vec![OpenLineageDataset::from(analysis.output.clone())];
249 attach_output_facets(&mut outputs[0], &analysis.output, options, &fields)?;
250
251 Ok(OpenLineageColumnLineageResult {
252 facet: ColumnLineageDatasetFacet {
253 producer: options.producer.clone(),
254 schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
255 fields,
256 },
257 inputs: analysis
258 .inputs
259 .into_iter()
260 .map(OpenLineageDataset::from)
261 .collect(),
262 outputs,
263 warnings,
264 })
265}
266
267pub fn openlineage_job_event(
269 sql: &str,
270 options: &OpenLineageOptions,
271) -> Result<OpenLineageEventResult> {
272 let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
273 let job_name = required_option(&options.job_name, "jobName")?;
274 let event_time = required_option(&options.event_time, "eventTime")?;
275
276 let result = openlineage_column_lineage(sql, options)?;
277 let event = json!({
278 "eventTime": event_time,
279 "producer": options.producer,
280 "schemaURL": OPENLINEAGE_SCHEMA_URL,
281 "job": {
282 "namespace": job_namespace,
283 "name": job_name,
284 "facets": job_facets(sql, options),
285 },
286 "inputs": result.inputs,
287 "outputs": result.outputs,
288 });
289
290 Ok(OpenLineageEventResult {
291 event,
292 warnings: result.warnings,
293 })
294}
295
296pub fn openlineage_run_event(
298 sql: &str,
299 options: &OpenLineageOptions,
300) -> Result<OpenLineageEventResult> {
301 let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
302 let job_name = required_option(&options.job_name, "jobName")?;
303 let event_time = required_option(&options.event_time, "eventTime")?;
304 let run_id = required_option(&options.run_id, "runId")?;
305 let event_type = options
306 .event_type
307 .ok_or_else(|| Error::parse("Missing required option: eventType", 0, 0, 0, 0))?;
308
309 let result = openlineage_column_lineage(sql, options)?;
310 let event = json!({
311 "eventTime": event_time,
312 "eventType": event_type,
313 "producer": options.producer,
314 "schemaURL": OPENLINEAGE_SCHEMA_URL,
315 "run": {
316 "runId": run_id,
317 "facets": {},
318 },
319 "job": {
320 "namespace": job_namespace,
321 "name": job_name,
322 "facets": job_facets(sql, options),
323 },
324 "inputs": result.inputs,
325 "outputs": result.outputs,
326 });
327
328 Ok(OpenLineageEventResult {
329 event,
330 warnings: result.warnings,
331 })
332}
333
334fn validate_common_options(options: &OpenLineageOptions) -> Result<()> {
335 if options.producer.trim().is_empty() {
336 return Err(Error::parse(
337 "Missing required option: producer",
338 0,
339 0,
340 0,
341 0,
342 ));
343 }
344 Ok(())
345}
346
347fn required_option(value: &Option<String>, name: &str) -> Result<String> {
348 match value.as_ref().filter(|v| !v.trim().is_empty()) {
349 Some(value) => Ok(value.clone()),
350 None => Err(Error::parse(
351 format!("Missing required option: {name}"),
352 0,
353 0,
354 0,
355 0,
356 )),
357 }
358}
359
360fn analyze_statement(
361 expr: &Expression,
362 options: &OpenLineageOptions,
363 warnings: &mut Vec<OpenLineageWarning>,
364) -> Result<StatementAnalysis> {
365 match expr {
366 Expression::Select(select) => {
367 let output = if let Some(into) = &select.into {
368 dataset_from_expression(&into.this, options)?
369 } else {
370 options.output_dataset.clone().ok_or_else(|| {
371 Error::parse(
372 "OpenLineage outputDataset is required for SELECT statements without SELECT INTO",
373 0,
374 0,
375 0,
376 0,
377 )
378 })?
379 };
380 Ok(StatementAnalysis {
381 query: expr.clone(),
382 inputs: collect_input_datasets(expr, options, Some(&output), warnings)?,
383 output,
384 output_column_names: Vec::new(),
385 })
386 }
387 Expression::Insert(insert) => {
388 let output = dataset_from_table_ref(&insert.table, options)?;
389 let query = insert.query.clone().ok_or_else(|| {
390 Error::unsupported(
391 "OpenLineage column lineage for INSERT without query",
392 options.dialect.to_string(),
393 )
394 })?;
395 Ok(StatementAnalysis {
396 inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
397 query,
398 output,
399 output_column_names: insert.columns.iter().map(|col| col.name.clone()).collect(),
400 })
401 }
402 Expression::CreateTable(create) => {
403 let output = dataset_from_table_ref(&create.name, options)?;
404 let query = create.as_select.clone().ok_or_else(|| {
405 Error::unsupported(
406 "OpenLineage column lineage for CREATE TABLE without AS SELECT",
407 options.dialect.to_string(),
408 )
409 })?;
410 Ok(StatementAnalysis {
411 inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
412 query,
413 output,
414 output_column_names: create
415 .columns
416 .iter()
417 .map(|col| col.name.name.clone())
418 .collect(),
419 })
420 }
421 _ => Err(Error::unsupported(
422 format!("OpenLineage generation for {}", expr.variant_name()),
423 options.dialect.to_string(),
424 )),
425 }
426}
427
428fn output_fields_for_query(
429 query: &Expression,
430 schema: Option<&dyn Schema>,
431 dialect: DialectType,
432 warnings: &mut Vec<OpenLineageWarning>,
433) -> Result<Vec<OutputField>> {
434 let select = leftmost_select(query).ok_or_else(|| {
435 Error::unsupported(
436 "OpenLineage output field extraction for non-SELECT query",
437 dialect.to_string(),
438 )
439 })?;
440
441 let mut fields = Vec::new();
442 for (idx, expr) in select.expressions.iter().enumerate() {
443 if is_star_expr(expr) {
444 expand_star_output_fields(select, expr, schema, warnings, &mut fields);
445 continue;
446 }
447
448 let name = output_name(expr).unwrap_or_else(|| format!("_{idx}"));
449 fields.push(OutputField {
450 lineage_name: name.clone(),
451 name,
452 expression: Some(expr.clone()),
453 star_source_table: None,
454 });
455 }
456 Ok(fields)
457}
458
459fn apply_output_column_names(
460 fields: &mut [OutputField],
461 output_column_names: &[String],
462 warnings: &mut Vec<OpenLineageWarning>,
463) {
464 if output_column_names.is_empty() {
465 return;
466 }
467 if output_column_names.len() != fields.len() {
468 warnings.push(OpenLineageWarning::new(
469 "W_OUTPUT_COLUMN_COUNT_MISMATCH",
470 format!(
471 "Target column count ({}) does not match projected column count ({})",
472 output_column_names.len(),
473 fields.len()
474 ),
475 ));
476 return;
477 }
478 for (field, output_name) in fields.iter_mut().zip(output_column_names) {
479 field.name = output_name.clone();
480 }
481}
482
483fn input_fields_for_output(
484 query: &Expression,
485 output_field: &OutputField,
486 options: &OpenLineageOptions,
487 schema: Option<&dyn Schema>,
488 warnings: &mut Vec<OpenLineageWarning>,
489) -> Result<Vec<OpenLineageInputField>> {
490 if let Some(table) = &output_field.star_source_table {
491 return terminal_fields_to_openlineage(
492 vec![TerminalField {
493 table: table.clone(),
494 field: output_field.lineage_name.clone(),
495 }],
496 "IDENTITY",
497 Some(format!("SELECT {}", output_field.lineage_name)),
498 options,
499 warnings,
500 );
501 }
502
503 let lineage_result = if let Some(schema) = schema {
504 lineage::lineage_with_schema(
505 &output_field.lineage_name,
506 query,
507 Some(schema),
508 Some(options.dialect),
509 false,
510 )
511 } else {
512 lineage::lineage(
513 &output_field.lineage_name,
514 query,
515 Some(options.dialect),
516 false,
517 )
518 };
519
520 let node = match lineage_result {
521 Ok(node) => node,
522 Err(err) => {
523 warnings.push(OpenLineageWarning::new(
524 "W_UNRESOLVED_OUTPUT_FIELD",
525 format!(
526 "Could not resolve lineage for output field '{}': {}",
527 output_field.name, err
528 ),
529 ));
530 return Ok(Vec::new());
531 }
532 };
533
534 let mut terminals = BTreeSet::new();
535 collect_terminal_fields(&node, &mut terminals);
536 let terminals: Vec<TerminalField> = terminals.into_iter().collect();
537
538 if terminals.is_empty() {
539 warnings.push(OpenLineageWarning::new(
540 "W_EMPTY_FIELD_LINEAGE",
541 format!(
542 "No input fields were found for output field '{}'",
543 output_field.name
544 ),
545 ));
546 return Ok(Vec::new());
547 }
548
549 let subtype = transformation_subtype(output_field.expression.as_ref(), &terminals);
550 let description = output_field
551 .expression
552 .as_ref()
553 .and_then(|expr| transformation_description(expr, options.dialect));
554
555 terminal_fields_to_openlineage(terminals, subtype, description, options, warnings)
556}
557
558fn transformation_description(expr: &Expression, dialect: DialectType) -> Option<String> {
559 #[cfg(feature = "generate")]
560 {
561 Some(expr.sql_for(dialect))
562 }
563
564 #[cfg(not(feature = "generate"))]
565 {
566 let _ = (expr, dialect);
567 None
568 }
569}
570
571fn terminal_fields_to_openlineage(
572 terminals: Vec<TerminalField>,
573 subtype: &str,
574 description: Option<String>,
575 options: &OpenLineageOptions,
576 warnings: &mut Vec<OpenLineageWarning>,
577) -> Result<Vec<OpenLineageInputField>> {
578 let mut result = Vec::new();
579 for terminal in terminals {
580 let dataset = dataset_from_table_name(&terminal.table, options).map_err(|err| {
581 warnings.push(OpenLineageWarning::new(
582 "W_UNRESOLVED_DATASET",
583 format!(
584 "Could not map table '{}' to an OpenLineage dataset: {}",
585 terminal.table, err
586 ),
587 ));
588 err
589 })?;
590 result.push(OpenLineageInputField {
591 namespace: dataset.namespace,
592 name: dataset.name,
593 field: terminal.field,
594 transformations: vec![OpenLineageTransformation {
595 type_: "DIRECT".to_string(),
596 subtype: subtype.to_string(),
597 description: description.clone(),
598 masking: Some(false),
599 }],
600 });
601 }
602 Ok(result)
603}
604
605fn transformation_subtype(expr: Option<&Expression>, terminals: &[TerminalField]) -> &'static str {
606 let Some(expr) = expr else {
607 return "TRANSFORMATION";
608 };
609 let unaliased = unalias(expr);
610 if expression_contains_aggregate(unaliased) {
611 return "AGGREGATION";
612 }
613 if terminals.len() == 1 {
614 if let Expression::Column(col) = unaliased {
615 if col.name.name == terminals[0].field {
616 return "IDENTITY";
617 }
618 }
619 }
620 "TRANSFORMATION"
621}
622
623fn collect_terminal_fields(node: &LineageNode, terminals: &mut BTreeSet<TerminalField>) {
624 if node.downstream.is_empty() {
625 if let Expression::Column(column) = &node.expression {
626 let table = if !node.source_name.is_empty() {
627 Some(node.source_name.clone())
628 } else if let Expression::Table(table) = &node.source {
629 Some(table_ref_qualified_name(table))
630 } else {
631 column.table.as_ref().map(|t| t.name.clone())
632 };
633 if let Some(table) = table.filter(|t| !t.is_empty()) {
634 terminals.insert(TerminalField {
635 table,
636 field: column.name.name.clone(),
637 });
638 }
639 }
640 return;
641 }
642
643 for child in &node.downstream {
644 collect_terminal_fields(child, terminals);
645 }
646}
647
648fn expression_contains_aggregate(expr: &Expression) -> bool {
649 expr.contains(|node| {
650 matches!(
651 node,
652 Expression::AggregateFunction(_)
653 | Expression::Sum(_)
654 | Expression::Count(_)
655 | Expression::Avg(_)
656 | Expression::Min(_)
657 | Expression::Max(_)
658 | Expression::GroupConcat(_)
659 | Expression::StringAgg(_)
660 | Expression::ListAgg(_)
661 | Expression::ArrayAgg(_)
662 | Expression::CountIf(_)
663 | Expression::SumIf(_)
664 | Expression::Stddev(_)
665 | Expression::StddevPop(_)
666 | Expression::StddevSamp(_)
667 | Expression::Variance(_)
668 | Expression::VarPop(_)
669 | Expression::VarSamp(_)
670 | Expression::Median(_)
671 | Expression::Mode(_)
672 | Expression::First(_)
673 | Expression::Last(_)
674 | Expression::AnyValue(_)
675 | Expression::ApproxDistinct(_)
676 | Expression::ApproxCountDistinct(_)
677 | Expression::ApproxPercentile(_)
678 | Expression::Percentile(_)
679 | Expression::LogicalAnd(_)
680 | Expression::LogicalOr(_)
681 | Expression::Skewness(_)
682 | Expression::BitwiseCount(_)
683 | Expression::ArrayConcatAgg(_)
684 | Expression::ArrayUniqueAgg(_)
685 | Expression::BoolXorAgg(_)
686 | Expression::ParameterizedAgg(_)
687 | Expression::ArgMax(_)
688 | Expression::ArgMin(_)
689 | Expression::ApproxTopK(_)
690 | Expression::ApproxTopKAccumulate(_)
691 | Expression::ApproxTopKCombine(_)
692 | Expression::ApproxTopKEstimate(_)
693 | Expression::ApproxTopSum(_)
694 | Expression::ApproxQuantiles(_)
695 | Expression::Grouping(_)
696 | Expression::GroupingId(_)
697 | Expression::AnonymousAggFunc(_)
698 | Expression::CombinedAggFunc(_)
699 | Expression::CombinedParameterizedAgg(_)
700 | Expression::HashAgg(_)
701 | Expression::ObjectAgg(_)
702 | Expression::AIAgg(_)
703 )
704 })
705}
706
707fn collect_input_datasets(
708 expr: &Expression,
709 options: &OpenLineageOptions,
710 output: Option<&OpenLineageDatasetId>,
711 warnings: &mut Vec<OpenLineageWarning>,
712) -> Result<Vec<OpenLineageDatasetId>> {
713 let cte_aliases = collect_cte_aliases(expr, options.dialect);
714 let mut seen = BTreeSet::new();
715 let mut result = Vec::new();
716
717 for table in expr.dfs().filter_map(|node| match node {
718 Expression::Table(table) => Some(table),
719 _ => None,
720 }) {
721 let qname = table_ref_qualified_name(table);
722 let normalized = normalize_identifier(&table.name.name, options.dialect, true);
723 if cte_aliases.contains(&normalized) {
724 continue;
725 }
726 if output
727 .map(|out| out.name == qname || out.name == table.name.name)
728 .unwrap_or(false)
729 {
730 continue;
731 }
732 match dataset_from_table_name(&qname, options) {
733 Ok(dataset) => {
734 if seen.insert((dataset.namespace.clone(), dataset.name.clone())) {
735 result.push(dataset);
736 }
737 }
738 Err(err) => warnings.push(OpenLineageWarning::new(
739 "W_UNRESOLVED_DATASET",
740 format!("Could not map input table '{qname}': {err}"),
741 )),
742 }
743 }
744
745 Ok(result)
746}
747
748fn attach_output_facets(
749 output: &mut OpenLineageDataset,
750 output_id: &OpenLineageDatasetId,
751 options: &OpenLineageOptions,
752 fields: &BTreeMap<String, ColumnLineageField>,
753) -> Result<()> {
754 let column_lineage = ColumnLineageDatasetFacet {
755 producer: options.producer.clone(),
756 schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
757 fields: fields.clone(),
758 };
759 output.facets.insert(
760 "columnLineage".to_string(),
761 serde_json::to_value(column_lineage).map_err(openlineage_serialization_error)?,
762 );
763
764 if let Some(schema_facet) = schema_facet_for_dataset(output_id, options) {
765 output.facets.insert(
766 "schema".to_string(),
767 serde_json::to_value(schema_facet).map_err(openlineage_serialization_error)?,
768 );
769 }
770
771 Ok(())
772}
773
774fn job_facets(sql: &str, options: &OpenLineageOptions) -> Value {
775 json!({
776 "sql": {
777 "_producer": options.producer,
778 "_schemaURL": SQL_JOB_FACET_SCHEMA_URL,
779 "query": sql,
780 "dialect": options.dialect.to_string(),
781 },
782 "jobType": {
783 "_producer": options.producer,
784 "_schemaURL": JOB_TYPE_JOB_FACET_SCHEMA_URL,
785 "processingType": "BATCH",
786 "integration": "POLYGLOT_SQL",
787 "jobType": "QUERY",
788 }
789 })
790}
791
792#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
793struct SchemaDatasetFacet {
794 #[serde(rename = "_producer")]
795 producer: String,
796 #[serde(rename = "_schemaURL")]
797 schema_url: String,
798 fields: Vec<SchemaDatasetFacetField>,
799}
800
801#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
802struct SchemaDatasetFacetField {
803 name: String,
804 #[serde(skip_serializing_if = "String::is_empty", default)]
805 #[serde(rename = "type")]
806 data_type: String,
807 #[serde(skip_serializing_if = "Option::is_none")]
808 ordinal_position: Option<usize>,
809}
810
811fn schema_facet_for_dataset(
812 output: &OpenLineageDatasetId,
813 options: &OpenLineageOptions,
814) -> Option<SchemaDatasetFacet> {
815 let schema = options.schema.as_ref()?;
816 let table = schema.tables.iter().find(|table| {
817 let qname = if let Some(schema_name) = &table.schema {
818 format!("{}.{}", schema_name, table.name)
819 } else {
820 table.name.clone()
821 };
822 output.name == table.name || output.name == qname
823 })?;
824
825 Some(SchemaDatasetFacet {
826 producer: options.producer.clone(),
827 schema_url: SCHEMA_DATASET_FACET_SCHEMA_URL.to_string(),
828 fields: table
829 .columns
830 .iter()
831 .enumerate()
832 .map(|(idx, col)| SchemaDatasetFacetField {
833 name: col.name.clone(),
834 data_type: col.data_type.clone(),
835 ordinal_position: Some(idx + 1),
836 })
837 .collect(),
838 })
839}
840
841fn expand_star_output_fields(
842 select: &Select,
843 star_expr: &Expression,
844 schema: Option<&dyn Schema>,
845 warnings: &mut Vec<OpenLineageWarning>,
846 fields: &mut Vec<OutputField>,
847) {
848 let Some(schema) = schema else {
849 warnings.push(OpenLineageWarning::new(
850 "W_STAR_WITHOUT_SCHEMA",
851 "SELECT * cannot be expanded into OpenLineage column lineage without schema metadata",
852 ));
853 return;
854 };
855
856 let qualifier = star_qualifier(star_expr);
857 let sources = select_source_tables(select);
858 for (alias, qname) in sources {
859 if qualifier
860 .as_ref()
861 .map(|q| q != &alias && q != &qname)
862 .unwrap_or(false)
863 {
864 continue;
865 }
866 match schema.column_names(&qname) {
867 Ok(columns) => {
868 for name in columns {
869 fields.push(OutputField {
870 lineage_name: name.clone(),
871 name,
872 expression: None,
873 star_source_table: Some(qname.clone()),
874 });
875 }
876 }
877 Err(err) => warnings.push(OpenLineageWarning::new(
878 "W_STAR_SCHEMA_LOOKUP_FAILED",
879 format!("Could not expand SELECT * for table '{}': {}", qname, err),
880 )),
881 }
882 }
883}
884
885fn select_source_tables(select: &Select) -> Vec<(String, String)> {
886 let mut result = Vec::new();
887 if let Some(from) = &select.from {
888 for expr in &from.expressions {
889 collect_source_table(expr, &mut result);
890 }
891 }
892 for join in &select.joins {
893 collect_source_table(&join.this, &mut result);
894 }
895 result
896}
897
898fn collect_source_table(expr: &Expression, result: &mut Vec<(String, String)>) {
899 match expr {
900 Expression::Table(table) => {
901 let qname = table_ref_qualified_name(table);
902 let alias = table
903 .alias
904 .as_ref()
905 .map(|a| a.name.clone())
906 .unwrap_or_else(|| table.name.name.clone());
907 result.push((alias, qname));
908 }
909 Expression::Alias(alias) => collect_source_table(&alias.this, result),
910 Expression::Paren(paren) => collect_source_table(&paren.this, result),
911 _ => {}
912 }
913}
914
915fn leftmost_select(expr: &Expression) -> Option<&Select> {
916 match expr {
917 Expression::Select(select) => Some(select),
918 Expression::Union(union) => leftmost_select(&union.left),
919 Expression::Intersect(intersect) => leftmost_select(&intersect.left),
920 Expression::Except(except) => leftmost_select(&except.left),
921 Expression::Subquery(subquery) => leftmost_select(&subquery.this),
922 _ => None,
923 }
924}
925
926fn output_name(expr: &Expression) -> Option<String> {
927 match expr {
928 Expression::Alias(alias) => Some(alias.alias.name.clone()),
929 Expression::Column(col) => Some(col.name.name.clone()),
930 Expression::Identifier(id) => Some(id.name.clone()),
931 Expression::Annotated(a) => output_name(&a.this),
932 _ => None,
933 }
934}
935
936fn unalias(expr: &Expression) -> &Expression {
937 match expr {
938 Expression::Alias(alias) => &alias.this,
939 Expression::Annotated(a) => unalias(&a.this),
940 _ => expr,
941 }
942}
943
944fn is_star_expr(expr: &Expression) -> bool {
945 matches!(expr, Expression::Star(_))
946 || matches!(expr, Expression::Column(col) if col.name.name == "*")
947}
948
949fn star_qualifier(expr: &Expression) -> Option<String> {
950 match expr {
951 Expression::Star(star) => star.table.as_ref().map(|t| t.name.clone()),
952 Expression::Column(col) if col.name.name == "*" => {
953 col.table.as_ref().map(|t| t.name.clone())
954 }
955 _ => None,
956 }
957}
958
959fn dataset_from_expression(
960 expr: &Expression,
961 options: &OpenLineageOptions,
962) -> Result<OpenLineageDatasetId> {
963 match expr {
964 Expression::Table(table) => dataset_from_table_ref(table, options),
965 Expression::Identifier(id) => dataset_from_table_name(&id.name, options),
966 _ => Err(Error::unsupported(
967 "OpenLineage dataset extraction from non-table expression",
968 options.dialect.to_string(),
969 )),
970 }
971}
972
973fn dataset_from_table_ref(
974 table: &TableRef,
975 options: &OpenLineageOptions,
976) -> Result<OpenLineageDatasetId> {
977 dataset_from_table_name(&table_ref_qualified_name(table), options)
978}
979
980fn dataset_from_table_name(
981 table_name: &str,
982 options: &OpenLineageOptions,
983) -> Result<OpenLineageDatasetId> {
984 if let Some(mapped) = options.dataset_mappings.get(table_name) {
985 return Ok(mapped.clone());
986 }
987 let namespace = options.dataset_namespace.as_ref().ok_or_else(|| {
988 Error::parse(
989 format!(
990 "Missing datasetNamespace or explicit dataset mapping for table '{}'",
991 table_name
992 ),
993 0,
994 0,
995 0,
996 0,
997 )
998 })?;
999 Ok(OpenLineageDatasetId::new(namespace, table_name))
1000}
1001
1002fn table_ref_qualified_name(table: &TableRef) -> String {
1003 let mut parts = Vec::new();
1004 if let Some(catalog) = &table.catalog {
1005 parts.push(catalog.name.clone());
1006 }
1007 if let Some(schema) = &table.schema {
1008 parts.push(schema.name.clone());
1009 }
1010 parts.push(table.name.name.clone());
1011 parts.join(".")
1012}
1013
1014fn collect_cte_aliases(expr: &Expression, dialect: DialectType) -> HashSet<String> {
1015 let mut aliases = HashSet::new();
1016 for node in expr.dfs() {
1017 match node {
1018 Expression::Select(select) => {
1019 if let Some(with) = &select.with {
1020 collect_with_aliases(with, dialect, &mut aliases);
1021 }
1022 }
1023 Expression::Union(union) => {
1024 if let Some(with) = &union.with {
1025 collect_with_aliases(with, dialect, &mut aliases);
1026 }
1027 }
1028 Expression::Intersect(intersect) => {
1029 if let Some(with) = &intersect.with {
1030 collect_with_aliases(with, dialect, &mut aliases);
1031 }
1032 }
1033 Expression::Except(except) => {
1034 if let Some(with) = &except.with {
1035 collect_with_aliases(with, dialect, &mut aliases);
1036 }
1037 }
1038 _ => {}
1039 }
1040 }
1041 aliases
1042}
1043
1044fn collect_with_aliases(with: &With, dialect: DialectType, aliases: &mut HashSet<String>) {
1045 for cte in &with.ctes {
1046 aliases.insert(normalize_identifier(&cte.alias.name, dialect, true));
1047 }
1048}
1049
1050fn normalize_identifier(name: &str, dialect: DialectType, is_table: bool) -> String {
1051 crate::schema::normalize_name(name, Some(dialect), is_table, true)
1052}
1053
1054fn openlineage_serialization_error(err: serde_json::Error) -> Error {
1055 Error::internal(format!("OpenLineage serialization failed: {err}"))
1056}
1057
1058fn deserialize_dialect_type<'de, D>(deserializer: D) -> std::result::Result<DialectType, D::Error>
1059where
1060 D: Deserializer<'de>,
1061{
1062 let value = String::deserialize(deserializer)?;
1063 value.parse::<DialectType>().map_err(de::Error::custom)
1064}
1065
1066#[cfg(test)]
1067mod tests {
1068 use super::*;
1069
1070 fn options() -> OpenLineageOptions {
1071 OpenLineageOptions {
1072 dialect: DialectType::PostgreSQL,
1073 producer: "https://github.com/tobilg/polyglot".to_string(),
1074 dataset_namespace: Some("postgres://warehouse".to_string()),
1075 output_dataset: Some(OpenLineageDatasetId::new(
1076 "postgres://warehouse",
1077 "analytics.out",
1078 )),
1079 job_namespace: Some("polyglot-tests".to_string()),
1080 job_name: Some("lineage-test".to_string()),
1081 event_time: Some("2026-05-18T00:00:00Z".to_string()),
1082 run_id: Some("3b452093-782c-4ef2-9c0c-aafe2aa6f34d".to_string()),
1083 event_type: Some(OpenLineageRunEventType::Complete),
1084 ..Default::default()
1085 }
1086 }
1087
1088 #[test]
1089 fn deserializes_dialect_aliases_in_options() {
1090 let options: OpenLineageOptions =
1091 serde_json::from_str(r#"{"producer":"polyglot","dialect":"postgres"}"#)
1092 .expect("options");
1093 assert_eq!(options.dialect, DialectType::PostgreSQL);
1094 }
1095
1096 #[test]
1097 fn emits_identity_column_lineage_for_select() {
1098 let result = openlineage_column_lineage("SELECT a FROM t", &options()).expect("lineage");
1099 let field = result.facet.fields.get("a").expect("field a");
1100 assert_eq!(field.input_fields.len(), 1);
1101 assert_eq!(field.input_fields[0].name, "t");
1102 assert_eq!(field.input_fields[0].field, "a");
1103 assert_eq!(field.input_fields[0].transformations[0].subtype, "IDENTITY");
1104 }
1105
1106 #[test]
1107 fn resolves_input_dataset_behind_table_alias() {
1108 let result = openlineage_column_lineage("SELECT o.total FROM orders o", &options())
1109 .expect("lineage");
1110 let field = result.facet.fields.get("total").expect("field total");
1111 assert_eq!(field.input_fields[0].name, "orders");
1112 assert_eq!(field.input_fields[0].field, "total");
1113 }
1114
1115 #[test]
1116 fn emits_transformation_column_lineage_for_expression() {
1117 let result =
1118 openlineage_column_lineage("SELECT a + b AS c FROM t", &options()).expect("lineage");
1119 let field = result.facet.fields.get("c").expect("field c");
1120 assert_eq!(field.input_fields.len(), 2);
1121 assert!(field.input_fields.iter().any(|f| f.field == "a"));
1122 assert!(field.input_fields.iter().any(|f| f.field == "b"));
1123 assert!(field
1124 .input_fields
1125 .iter()
1126 .all(|f| f.transformations[0].subtype == "TRANSFORMATION"));
1127 }
1128
1129 #[test]
1130 fn emits_aggregation_column_lineage() {
1131 let result =
1132 openlineage_column_lineage("SELECT SUM(amount) AS total FROM orders", &options())
1133 .expect("lineage");
1134 let field = result.facet.fields.get("total").expect("field total");
1135 assert_eq!(field.input_fields[0].field, "amount");
1136 assert_eq!(
1137 field.input_fields[0].transformations[0].subtype,
1138 "AGGREGATION"
1139 );
1140 }
1141
1142 #[test]
1143 fn infers_insert_output_dataset() {
1144 let mut opts = options();
1145 opts.output_dataset = None;
1146 let result =
1147 openlineage_column_lineage("INSERT INTO analytics.out SELECT a FROM raw.input", &opts)
1148 .expect("lineage");
1149 assert_eq!(result.outputs[0].name, "analytics.out");
1150 assert_eq!(result.inputs[0].name, "raw.input");
1151 }
1152
1153 #[test]
1154 fn maps_insert_target_columns_to_output_fields() {
1155 let mut opts = options();
1156 opts.output_dataset = None;
1157 let result = openlineage_column_lineage(
1158 "INSERT INTO analytics.out (target_a) SELECT source_a FROM raw.input",
1159 &opts,
1160 )
1161 .expect("lineage");
1162 let field = result.facet.fields.get("target_a").expect("target field");
1163 assert_eq!(field.input_fields[0].field, "source_a");
1164 assert!(!result.facet.fields.contains_key("source_a"));
1165 }
1166
1167 #[test]
1168 fn pure_select_requires_output_dataset() {
1169 let mut opts = options();
1170 opts.output_dataset = None;
1171 let err = openlineage_column_lineage("SELECT a FROM t", &opts).unwrap_err();
1172 assert!(err.to_string().contains("outputDataset is required"));
1173 }
1174
1175 #[test]
1176 fn emits_job_event_payload() {
1177 let result = openlineage_job_event("SELECT a FROM t", &options()).expect("event");
1178 assert_eq!(result.event["job"]["namespace"], "polyglot-tests");
1179 assert_eq!(
1180 result.event["job"]["facets"]["sql"]["_schemaURL"],
1181 SQL_JOB_FACET_SCHEMA_URL
1182 );
1183 assert_eq!(
1184 result.event["outputs"][0]["facets"]["columnLineage"]["fields"]["a"]["inputFields"][0]
1185 ["field"],
1186 "a"
1187 );
1188 }
1189
1190 #[test]
1191 fn emits_run_event_payload() {
1192 let result = openlineage_run_event("SELECT a FROM t", &options()).expect("event");
1193 assert_eq!(result.event["eventType"], "COMPLETE");
1194 assert_eq!(
1195 result.event["run"]["runId"],
1196 "3b452093-782c-4ef2-9c0c-aafe2aa6f34d"
1197 );
1198 }
1199
1200 #[test]
1201 fn select_star_without_schema_warns() {
1202 let result = openlineage_column_lineage("SELECT * FROM t", &options()).expect("lineage");
1203 assert!(result.facet.fields.is_empty());
1204 assert!(result
1205 .warnings
1206 .iter()
1207 .any(|w| w.code == "W_STAR_WITHOUT_SCHEMA"));
1208 }
1209
1210 #[test]
1211 fn select_star_with_schema_expands_fields() {
1212 let mut opts = options();
1213 opts.schema = Some(ValidationSchema {
1214 strict: None,
1215 tables: vec![crate::validation::SchemaTable {
1216 name: "t".to_string(),
1217 schema: None,
1218 columns: vec![
1219 crate::validation::SchemaColumn {
1220 name: "a".to_string(),
1221 data_type: "INT".to_string(),
1222 nullable: None,
1223 primary_key: false,
1224 unique: false,
1225 references: None,
1226 },
1227 crate::validation::SchemaColumn {
1228 name: "b".to_string(),
1229 data_type: "TEXT".to_string(),
1230 nullable: None,
1231 primary_key: false,
1232 unique: false,
1233 references: None,
1234 },
1235 ],
1236 aliases: vec![],
1237 primary_key: vec![],
1238 unique_keys: vec![],
1239 foreign_keys: vec![],
1240 }],
1241 });
1242
1243 let result = openlineage_column_lineage("SELECT * FROM t", &opts).expect("lineage");
1244 assert!(result.facet.fields.contains_key("a"));
1245 assert!(result.facet.fields.contains_key("b"));
1246 }
1247}