1use crate::dialects::{Dialect, DialectType};
8use crate::expressions::*;
9use crate::lineage::{self, LineageNode};
10use crate::schema::Schema;
11use crate::traversal::ExpressionWalk;
12use crate::{mapping_schema_from_validation_schema, Error, Result, ValidationSchema};
13use serde::de::{self, Deserializer};
14use serde::{Deserialize, Serialize};
15use serde_json::{json, Value};
16use std::collections::{BTreeMap, BTreeSet, HashSet};
17
18pub const OPENLINEAGE_SCHEMA_URL: &str = "https://openlineage.io/spec/2-0-2/OpenLineage.json";
19pub const COLUMN_LINEAGE_FACET_SCHEMA_URL: &str =
20 "https://openlineage.io/spec/facets/1-2-0/ColumnLineageDatasetFacet.json";
21pub const SQL_JOB_FACET_SCHEMA_URL: &str =
22 "https://openlineage.io/spec/facets/1-1-0/SQLJobFacet.json";
23pub const JOB_TYPE_JOB_FACET_SCHEMA_URL: &str =
24 "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json";
25pub const SCHEMA_DATASET_FACET_SCHEMA_URL: &str =
26 "https://openlineage.io/spec/facets/1-2-0/SchemaDatasetFacet.json";
27
28#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
30#[serde(rename_all = "camelCase")]
31pub struct OpenLineageDatasetId {
32 pub namespace: String,
33 pub name: String,
34}
35
36impl OpenLineageDatasetId {
37 pub fn new(namespace: impl Into<String>, name: impl Into<String>) -> Self {
38 Self {
39 namespace: namespace.into(),
40 name: name.into(),
41 }
42 }
43}
44
45#[derive(Debug, Clone, Serialize, Deserialize, Default)]
47#[serde(rename_all = "camelCase", default)]
48pub struct OpenLineageOptions {
49 #[serde(deserialize_with = "deserialize_dialect_type")]
50 pub dialect: DialectType,
51 pub producer: String,
52 pub dataset_namespace: Option<String>,
53 pub dataset_mappings: BTreeMap<String, OpenLineageDatasetId>,
54 pub output_dataset: Option<OpenLineageDatasetId>,
55 pub schema: Option<ValidationSchema>,
56 pub job_namespace: Option<String>,
57 pub job_name: Option<String>,
58 pub event_time: Option<String>,
59 pub run_id: Option<String>,
60 pub event_type: Option<OpenLineageRunEventType>,
61}
62
63#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
65#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
66pub enum OpenLineageRunEventType {
67 Start,
68 Running,
69 Complete,
70 Abort,
71 Fail,
72 Other,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(rename_all = "camelCase")]
78pub struct OpenLineageWarning {
79 pub code: String,
80 pub message: String,
81}
82
83impl OpenLineageWarning {
84 fn new(code: impl Into<String>, message: impl Into<String>) -> Self {
85 Self {
86 code: code.into(),
87 message: message.into(),
88 }
89 }
90}
91
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93#[serde(rename_all = "camelCase")]
94pub struct OpenLineageColumnLineageResult {
95 pub facet: ColumnLineageDatasetFacet,
96 pub inputs: Vec<OpenLineageDataset>,
97 pub outputs: Vec<OpenLineageDataset>,
98 pub warnings: Vec<OpenLineageWarning>,
99}
100
101#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
102#[serde(rename_all = "camelCase")]
103pub struct OpenLineageEventResult {
104 pub event: Value,
105 pub warnings: Vec<OpenLineageWarning>,
106}
107
108#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
109pub struct OpenLineageDataset {
110 pub namespace: String,
111 pub name: String,
112 #[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
113 pub facets: BTreeMap<String, Value>,
114}
115
116impl std::convert::From<OpenLineageDatasetId> for OpenLineageDataset {
117 fn from(id: OpenLineageDatasetId) -> Self {
118 Self {
119 namespace: id.namespace,
120 name: id.name,
121 facets: BTreeMap::new(),
122 }
123 }
124}
125
126#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
127pub struct ColumnLineageDatasetFacet {
128 #[serde(rename = "_producer")]
129 pub producer: String,
130 #[serde(rename = "_schemaURL")]
131 pub schema_url: String,
132 pub fields: BTreeMap<String, ColumnLineageField>,
133}
134
135#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct ColumnLineageField {
138 pub input_fields: Vec<OpenLineageInputField>,
139}
140
141#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
142#[serde(rename_all = "camelCase")]
143pub struct OpenLineageInputField {
144 pub namespace: String,
145 pub name: String,
146 pub field: String,
147 #[serde(skip_serializing_if = "Vec::is_empty", default)]
148 pub transformations: Vec<OpenLineageTransformation>,
149}
150
151#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
152pub struct OpenLineageTransformation {
153 #[serde(rename = "type")]
154 pub type_: String,
155 pub subtype: String,
156 #[serde(skip_serializing_if = "Option::is_none")]
157 pub description: Option<String>,
158 #[serde(skip_serializing_if = "Option::is_none")]
159 pub masking: Option<bool>,
160}
161
162#[derive(Debug, Clone)]
163struct StatementAnalysis {
164 query: Expression,
165 inputs: Vec<OpenLineageDatasetId>,
166 output: OpenLineageDatasetId,
167 output_column_names: Vec<String>,
168}
169
170#[derive(Debug, Clone)]
171struct OutputField {
172 name: String,
173 lineage_name: String,
174 expression: Option<Expression>,
175 star_source_table: Option<String>,
176}
177
178#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
179struct TerminalField {
180 table: String,
181 field: String,
182}
183
184pub fn openlineage_column_lineage(
186 sql: &str,
187 options: &OpenLineageOptions,
188) -> Result<OpenLineageColumnLineageResult> {
189 validate_common_options(options)?;
190
191 let mut warnings = Vec::new();
192 let schema_mapping = options
193 .schema
194 .as_ref()
195 .map(mapping_schema_from_validation_schema);
196 let dialect = Dialect::get(options.dialect);
197 let mut expressions = dialect.parse(sql)?;
198 if expressions.len() != 1 {
199 return Err(Error::parse(
200 format!(
201 "OpenLineage generation expects exactly one statement, found {}",
202 expressions.len()
203 ),
204 0,
205 0,
206 0,
207 0,
208 ));
209 }
210
211 let expr = expressions.remove(0);
212 let analysis = analyze_statement(&expr, options, &mut warnings)?;
213 let mut output_fields = output_fields_for_query(
214 &analysis.query,
215 schema_mapping.as_ref().map(|s| s as &dyn Schema),
216 options.dialect,
217 &mut warnings,
218 )?;
219 apply_output_column_names(
220 &mut output_fields,
221 &analysis.output_column_names,
222 &mut warnings,
223 );
224
225 let mut fields = BTreeMap::new();
226 for output_field in output_fields {
227 if fields.contains_key(&output_field.name) {
228 warnings.push(OpenLineageWarning::new(
229 "W_DUPLICATE_OUTPUT_FIELD",
230 format!(
231 "Duplicate output field '{}' was merged in the OpenLineage fields map",
232 output_field.name
233 ),
234 ));
235 }
236
237 let input_fields = input_fields_for_output(
238 &analysis.query,
239 &output_field,
240 options,
241 schema_mapping.as_ref().map(|s| s as &dyn Schema),
242 &mut warnings,
243 )?;
244
245 fields.insert(output_field.name, ColumnLineageField { input_fields });
246 }
247
248 let mut outputs = vec![OpenLineageDataset::from(analysis.output.clone())];
249 attach_output_facets(&mut outputs[0], &analysis.output, options, &fields)?;
250
251 Ok(OpenLineageColumnLineageResult {
252 facet: ColumnLineageDatasetFacet {
253 producer: options.producer.clone(),
254 schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
255 fields,
256 },
257 inputs: analysis
258 .inputs
259 .into_iter()
260 .map(OpenLineageDataset::from)
261 .collect(),
262 outputs,
263 warnings,
264 })
265}
266
267pub fn openlineage_job_event(
269 sql: &str,
270 options: &OpenLineageOptions,
271) -> Result<OpenLineageEventResult> {
272 let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
273 let job_name = required_option(&options.job_name, "jobName")?;
274 let event_time = required_option(&options.event_time, "eventTime")?;
275
276 let result = openlineage_column_lineage(sql, options)?;
277 let event = json!({
278 "eventTime": event_time,
279 "producer": options.producer,
280 "schemaURL": OPENLINEAGE_SCHEMA_URL,
281 "job": {
282 "namespace": job_namespace,
283 "name": job_name,
284 "facets": job_facets(sql, options),
285 },
286 "inputs": result.inputs,
287 "outputs": result.outputs,
288 });
289
290 Ok(OpenLineageEventResult {
291 event,
292 warnings: result.warnings,
293 })
294}
295
296pub fn openlineage_run_event(
298 sql: &str,
299 options: &OpenLineageOptions,
300) -> Result<OpenLineageEventResult> {
301 let job_namespace = required_option(&options.job_namespace, "jobNamespace")?;
302 let job_name = required_option(&options.job_name, "jobName")?;
303 let event_time = required_option(&options.event_time, "eventTime")?;
304 let run_id = required_option(&options.run_id, "runId")?;
305 let event_type = options
306 .event_type
307 .ok_or_else(|| Error::parse("Missing required option: eventType", 0, 0, 0, 0))?;
308
309 let result = openlineage_column_lineage(sql, options)?;
310 let event = json!({
311 "eventTime": event_time,
312 "eventType": event_type,
313 "producer": options.producer,
314 "schemaURL": OPENLINEAGE_SCHEMA_URL,
315 "run": {
316 "runId": run_id,
317 "facets": {},
318 },
319 "job": {
320 "namespace": job_namespace,
321 "name": job_name,
322 "facets": job_facets(sql, options),
323 },
324 "inputs": result.inputs,
325 "outputs": result.outputs,
326 });
327
328 Ok(OpenLineageEventResult {
329 event,
330 warnings: result.warnings,
331 })
332}
333
334fn validate_common_options(options: &OpenLineageOptions) -> Result<()> {
335 if options.producer.trim().is_empty() {
336 return Err(Error::parse(
337 "Missing required option: producer",
338 0,
339 0,
340 0,
341 0,
342 ));
343 }
344 Ok(())
345}
346
347fn required_option(value: &Option<String>, name: &str) -> Result<String> {
348 match value.as_ref().filter(|v| !v.trim().is_empty()) {
349 Some(value) => Ok(value.clone()),
350 None => Err(Error::parse(
351 format!("Missing required option: {name}"),
352 0,
353 0,
354 0,
355 0,
356 )),
357 }
358}
359
360fn analyze_statement(
361 expr: &Expression,
362 options: &OpenLineageOptions,
363 warnings: &mut Vec<OpenLineageWarning>,
364) -> Result<StatementAnalysis> {
365 match expr {
366 Expression::Select(select) => {
367 let output = if let Some(into) = &select.into {
368 dataset_from_expression(&into.this, options)?
369 } else {
370 options.output_dataset.clone().ok_or_else(|| {
371 Error::parse(
372 "OpenLineage outputDataset is required for SELECT statements without SELECT INTO",
373 0,
374 0,
375 0,
376 0,
377 )
378 })?
379 };
380 Ok(StatementAnalysis {
381 query: expr.clone(),
382 inputs: collect_input_datasets(expr, options, Some(&output), warnings)?,
383 output,
384 output_column_names: Vec::new(),
385 })
386 }
387 Expression::Insert(insert) => {
388 let output = dataset_from_table_ref(&insert.table, options)?;
389 let query = insert.query.clone().ok_or_else(|| {
390 Error::unsupported(
391 "OpenLineage column lineage for INSERT without query",
392 options.dialect.to_string(),
393 )
394 })?;
395 Ok(StatementAnalysis {
396 inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
397 query,
398 output,
399 output_column_names: insert.columns.iter().map(|col| col.name.clone()).collect(),
400 })
401 }
402 Expression::CreateTable(create) => {
403 let output = dataset_from_table_ref(&create.name, options)?;
404 let query = create.as_select.clone().ok_or_else(|| {
405 Error::unsupported(
406 "OpenLineage column lineage for CREATE TABLE without AS SELECT",
407 options.dialect.to_string(),
408 )
409 })?;
410 Ok(StatementAnalysis {
411 inputs: collect_input_datasets(&query, options, Some(&output), warnings)?,
412 query,
413 output,
414 output_column_names: create
415 .columns
416 .iter()
417 .map(|col| col.name.name.clone())
418 .collect(),
419 })
420 }
421 _ => Err(Error::unsupported(
422 format!("OpenLineage generation for {}", expr.variant_name()),
423 options.dialect.to_string(),
424 )),
425 }
426}
427
428fn output_fields_for_query(
429 query: &Expression,
430 schema: Option<&dyn Schema>,
431 dialect: DialectType,
432 warnings: &mut Vec<OpenLineageWarning>,
433) -> Result<Vec<OutputField>> {
434 let select = leftmost_select(query).ok_or_else(|| {
435 Error::unsupported(
436 "OpenLineage output field extraction for non-SELECT query",
437 dialect.to_string(),
438 )
439 })?;
440
441 let mut fields = Vec::new();
442 for (idx, expr) in select.expressions.iter().enumerate() {
443 if is_star_expr(expr) {
444 expand_star_output_fields(select, expr, schema, warnings, &mut fields);
445 continue;
446 }
447
448 let name = output_name(expr).unwrap_or_else(|| format!("_{idx}"));
449 fields.push(OutputField {
450 lineage_name: name.clone(),
451 name,
452 expression: Some(expr.clone()),
453 star_source_table: None,
454 });
455 }
456 Ok(fields)
457}
458
459fn apply_output_column_names(
460 fields: &mut [OutputField],
461 output_column_names: &[String],
462 warnings: &mut Vec<OpenLineageWarning>,
463) {
464 if output_column_names.is_empty() {
465 return;
466 }
467 if output_column_names.len() != fields.len() {
468 warnings.push(OpenLineageWarning::new(
469 "W_OUTPUT_COLUMN_COUNT_MISMATCH",
470 format!(
471 "Target column count ({}) does not match projected column count ({})",
472 output_column_names.len(),
473 fields.len()
474 ),
475 ));
476 return;
477 }
478 for (field, output_name) in fields.iter_mut().zip(output_column_names) {
479 field.name = output_name.clone();
480 }
481}
482
483fn input_fields_for_output(
484 query: &Expression,
485 output_field: &OutputField,
486 options: &OpenLineageOptions,
487 schema: Option<&dyn Schema>,
488 warnings: &mut Vec<OpenLineageWarning>,
489) -> Result<Vec<OpenLineageInputField>> {
490 if let Some(table) = &output_field.star_source_table {
491 return terminal_fields_to_openlineage(
492 vec![TerminalField {
493 table: table.clone(),
494 field: output_field.lineage_name.clone(),
495 }],
496 "IDENTITY",
497 Some(format!("SELECT {}", output_field.lineage_name)),
498 options,
499 warnings,
500 );
501 }
502
503 let lineage_result = if let Some(schema) = schema {
504 lineage::lineage_with_schema(
505 &output_field.lineage_name,
506 query,
507 Some(schema),
508 Some(options.dialect),
509 false,
510 )
511 } else {
512 lineage::lineage(
513 &output_field.lineage_name,
514 query,
515 Some(options.dialect),
516 false,
517 )
518 };
519
520 let node = match lineage_result {
521 Ok(node) => node,
522 Err(err) => {
523 warnings.push(OpenLineageWarning::new(
524 "W_UNRESOLVED_OUTPUT_FIELD",
525 format!(
526 "Could not resolve lineage for output field '{}': {}",
527 output_field.name, err
528 ),
529 ));
530 return Ok(Vec::new());
531 }
532 };
533
534 let mut terminals = BTreeSet::new();
535 collect_terminal_fields(&node, &mut terminals);
536 let terminals: Vec<TerminalField> = terminals.into_iter().collect();
537
538 if terminals.is_empty() {
539 warnings.push(OpenLineageWarning::new(
540 "W_EMPTY_FIELD_LINEAGE",
541 format!(
542 "No input fields were found for output field '{}'",
543 output_field.name
544 ),
545 ));
546 return Ok(Vec::new());
547 }
548
549 let subtype = transformation_subtype(output_field.expression.as_ref(), &terminals);
550 let description = output_field
551 .expression
552 .as_ref()
553 .map(|expr| expr.sql_for(options.dialect));
554
555 terminal_fields_to_openlineage(terminals, subtype, description, options, warnings)
556}
557
558fn terminal_fields_to_openlineage(
559 terminals: Vec<TerminalField>,
560 subtype: &str,
561 description: Option<String>,
562 options: &OpenLineageOptions,
563 warnings: &mut Vec<OpenLineageWarning>,
564) -> Result<Vec<OpenLineageInputField>> {
565 let mut result = Vec::new();
566 for terminal in terminals {
567 let dataset = dataset_from_table_name(&terminal.table, options).map_err(|err| {
568 warnings.push(OpenLineageWarning::new(
569 "W_UNRESOLVED_DATASET",
570 format!(
571 "Could not map table '{}' to an OpenLineage dataset: {}",
572 terminal.table, err
573 ),
574 ));
575 err
576 })?;
577 result.push(OpenLineageInputField {
578 namespace: dataset.namespace,
579 name: dataset.name,
580 field: terminal.field,
581 transformations: vec![OpenLineageTransformation {
582 type_: "DIRECT".to_string(),
583 subtype: subtype.to_string(),
584 description: description.clone(),
585 masking: Some(false),
586 }],
587 });
588 }
589 Ok(result)
590}
591
592fn transformation_subtype(expr: Option<&Expression>, terminals: &[TerminalField]) -> &'static str {
593 let Some(expr) = expr else {
594 return "TRANSFORMATION";
595 };
596 let unaliased = unalias(expr);
597 if expression_contains_aggregate(unaliased) {
598 return "AGGREGATION";
599 }
600 if terminals.len() == 1 {
601 if let Expression::Column(col) = unaliased {
602 if col.name.name == terminals[0].field {
603 return "IDENTITY";
604 }
605 }
606 }
607 "TRANSFORMATION"
608}
609
610fn collect_terminal_fields(node: &LineageNode, terminals: &mut BTreeSet<TerminalField>) {
611 if node.downstream.is_empty() {
612 if let Expression::Column(column) = &node.expression {
613 let table = if !node.source_name.is_empty() {
614 Some(node.source_name.clone())
615 } else if let Expression::Table(table) = &node.source {
616 Some(table_ref_qualified_name(table))
617 } else {
618 column.table.as_ref().map(|t| t.name.clone())
619 };
620 if let Some(table) = table.filter(|t| !t.is_empty()) {
621 terminals.insert(TerminalField {
622 table,
623 field: column.name.name.clone(),
624 });
625 }
626 }
627 return;
628 }
629
630 for child in &node.downstream {
631 collect_terminal_fields(child, terminals);
632 }
633}
634
635fn expression_contains_aggregate(expr: &Expression) -> bool {
636 expr.contains(|node| {
637 matches!(
638 node,
639 Expression::AggregateFunction(_)
640 | Expression::Sum(_)
641 | Expression::Count(_)
642 | Expression::Avg(_)
643 | Expression::Min(_)
644 | Expression::Max(_)
645 | Expression::GroupConcat(_)
646 | Expression::StringAgg(_)
647 | Expression::ListAgg(_)
648 | Expression::ArrayAgg(_)
649 | Expression::CountIf(_)
650 | Expression::SumIf(_)
651 | Expression::Stddev(_)
652 | Expression::StddevPop(_)
653 | Expression::StddevSamp(_)
654 | Expression::Variance(_)
655 | Expression::VarPop(_)
656 | Expression::VarSamp(_)
657 | Expression::Median(_)
658 | Expression::Mode(_)
659 | Expression::First(_)
660 | Expression::Last(_)
661 | Expression::AnyValue(_)
662 | Expression::ApproxDistinct(_)
663 | Expression::ApproxCountDistinct(_)
664 | Expression::ApproxPercentile(_)
665 | Expression::Percentile(_)
666 | Expression::LogicalAnd(_)
667 | Expression::LogicalOr(_)
668 | Expression::Skewness(_)
669 | Expression::BitwiseCount(_)
670 | Expression::ArrayConcatAgg(_)
671 | Expression::ArrayUniqueAgg(_)
672 | Expression::BoolXorAgg(_)
673 | Expression::ParameterizedAgg(_)
674 | Expression::ArgMax(_)
675 | Expression::ArgMin(_)
676 | Expression::ApproxTopK(_)
677 | Expression::ApproxTopKAccumulate(_)
678 | Expression::ApproxTopKCombine(_)
679 | Expression::ApproxTopKEstimate(_)
680 | Expression::ApproxTopSum(_)
681 | Expression::ApproxQuantiles(_)
682 | Expression::Grouping(_)
683 | Expression::GroupingId(_)
684 | Expression::AnonymousAggFunc(_)
685 | Expression::CombinedAggFunc(_)
686 | Expression::CombinedParameterizedAgg(_)
687 | Expression::HashAgg(_)
688 | Expression::ObjectAgg(_)
689 | Expression::AIAgg(_)
690 )
691 })
692}
693
694fn collect_input_datasets(
695 expr: &Expression,
696 options: &OpenLineageOptions,
697 output: Option<&OpenLineageDatasetId>,
698 warnings: &mut Vec<OpenLineageWarning>,
699) -> Result<Vec<OpenLineageDatasetId>> {
700 let cte_aliases = collect_cte_aliases(expr, options.dialect);
701 let mut seen = BTreeSet::new();
702 let mut result = Vec::new();
703
704 for table in expr.dfs().filter_map(|node| match node {
705 Expression::Table(table) => Some(table),
706 _ => None,
707 }) {
708 let qname = table_ref_qualified_name(table);
709 let normalized = normalize_identifier(&table.name.name, options.dialect, true);
710 if cte_aliases.contains(&normalized) {
711 continue;
712 }
713 if output
714 .map(|out| out.name == qname || out.name == table.name.name)
715 .unwrap_or(false)
716 {
717 continue;
718 }
719 match dataset_from_table_name(&qname, options) {
720 Ok(dataset) => {
721 if seen.insert((dataset.namespace.clone(), dataset.name.clone())) {
722 result.push(dataset);
723 }
724 }
725 Err(err) => warnings.push(OpenLineageWarning::new(
726 "W_UNRESOLVED_DATASET",
727 format!("Could not map input table '{qname}': {err}"),
728 )),
729 }
730 }
731
732 Ok(result)
733}
734
735fn attach_output_facets(
736 output: &mut OpenLineageDataset,
737 output_id: &OpenLineageDatasetId,
738 options: &OpenLineageOptions,
739 fields: &BTreeMap<String, ColumnLineageField>,
740) -> Result<()> {
741 let column_lineage = ColumnLineageDatasetFacet {
742 producer: options.producer.clone(),
743 schema_url: COLUMN_LINEAGE_FACET_SCHEMA_URL.to_string(),
744 fields: fields.clone(),
745 };
746 output.facets.insert(
747 "columnLineage".to_string(),
748 serde_json::to_value(column_lineage).map_err(openlineage_serialization_error)?,
749 );
750
751 if let Some(schema_facet) = schema_facet_for_dataset(output_id, options) {
752 output.facets.insert(
753 "schema".to_string(),
754 serde_json::to_value(schema_facet).map_err(openlineage_serialization_error)?,
755 );
756 }
757
758 Ok(())
759}
760
761fn job_facets(sql: &str, options: &OpenLineageOptions) -> Value {
762 json!({
763 "sql": {
764 "_producer": options.producer,
765 "_schemaURL": SQL_JOB_FACET_SCHEMA_URL,
766 "query": sql,
767 "dialect": options.dialect.to_string(),
768 },
769 "jobType": {
770 "_producer": options.producer,
771 "_schemaURL": JOB_TYPE_JOB_FACET_SCHEMA_URL,
772 "processingType": "BATCH",
773 "integration": "POLYGLOT_SQL",
774 "jobType": "QUERY",
775 }
776 })
777}
778
779#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
780struct SchemaDatasetFacet {
781 #[serde(rename = "_producer")]
782 producer: String,
783 #[serde(rename = "_schemaURL")]
784 schema_url: String,
785 fields: Vec<SchemaDatasetFacetField>,
786}
787
788#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
789struct SchemaDatasetFacetField {
790 name: String,
791 #[serde(skip_serializing_if = "String::is_empty", default)]
792 #[serde(rename = "type")]
793 data_type: String,
794 #[serde(skip_serializing_if = "Option::is_none")]
795 ordinal_position: Option<usize>,
796}
797
798fn schema_facet_for_dataset(
799 output: &OpenLineageDatasetId,
800 options: &OpenLineageOptions,
801) -> Option<SchemaDatasetFacet> {
802 let schema = options.schema.as_ref()?;
803 let table = schema.tables.iter().find(|table| {
804 let qname = if let Some(schema_name) = &table.schema {
805 format!("{}.{}", schema_name, table.name)
806 } else {
807 table.name.clone()
808 };
809 output.name == table.name || output.name == qname
810 })?;
811
812 Some(SchemaDatasetFacet {
813 producer: options.producer.clone(),
814 schema_url: SCHEMA_DATASET_FACET_SCHEMA_URL.to_string(),
815 fields: table
816 .columns
817 .iter()
818 .enumerate()
819 .map(|(idx, col)| SchemaDatasetFacetField {
820 name: col.name.clone(),
821 data_type: col.data_type.clone(),
822 ordinal_position: Some(idx + 1),
823 })
824 .collect(),
825 })
826}
827
828fn expand_star_output_fields(
829 select: &Select,
830 star_expr: &Expression,
831 schema: Option<&dyn Schema>,
832 warnings: &mut Vec<OpenLineageWarning>,
833 fields: &mut Vec<OutputField>,
834) {
835 let Some(schema) = schema else {
836 warnings.push(OpenLineageWarning::new(
837 "W_STAR_WITHOUT_SCHEMA",
838 "SELECT * cannot be expanded into OpenLineage column lineage without schema metadata",
839 ));
840 return;
841 };
842
843 let qualifier = star_qualifier(star_expr);
844 let sources = select_source_tables(select);
845 for (alias, qname) in sources {
846 if qualifier
847 .as_ref()
848 .map(|q| q != &alias && q != &qname)
849 .unwrap_or(false)
850 {
851 continue;
852 }
853 match schema.column_names(&qname) {
854 Ok(columns) => {
855 for name in columns {
856 fields.push(OutputField {
857 lineage_name: name.clone(),
858 name,
859 expression: None,
860 star_source_table: Some(qname.clone()),
861 });
862 }
863 }
864 Err(err) => warnings.push(OpenLineageWarning::new(
865 "W_STAR_SCHEMA_LOOKUP_FAILED",
866 format!("Could not expand SELECT * for table '{}': {}", qname, err),
867 )),
868 }
869 }
870}
871
872fn select_source_tables(select: &Select) -> Vec<(String, String)> {
873 let mut result = Vec::new();
874 if let Some(from) = &select.from {
875 for expr in &from.expressions {
876 collect_source_table(expr, &mut result);
877 }
878 }
879 for join in &select.joins {
880 collect_source_table(&join.this, &mut result);
881 }
882 result
883}
884
885fn collect_source_table(expr: &Expression, result: &mut Vec<(String, String)>) {
886 match expr {
887 Expression::Table(table) => {
888 let qname = table_ref_qualified_name(table);
889 let alias = table
890 .alias
891 .as_ref()
892 .map(|a| a.name.clone())
893 .unwrap_or_else(|| table.name.name.clone());
894 result.push((alias, qname));
895 }
896 Expression::Alias(alias) => collect_source_table(&alias.this, result),
897 Expression::Paren(paren) => collect_source_table(&paren.this, result),
898 _ => {}
899 }
900}
901
902fn leftmost_select(expr: &Expression) -> Option<&Select> {
903 match expr {
904 Expression::Select(select) => Some(select),
905 Expression::Union(union) => leftmost_select(&union.left),
906 Expression::Intersect(intersect) => leftmost_select(&intersect.left),
907 Expression::Except(except) => leftmost_select(&except.left),
908 Expression::Subquery(subquery) => leftmost_select(&subquery.this),
909 _ => None,
910 }
911}
912
913fn output_name(expr: &Expression) -> Option<String> {
914 match expr {
915 Expression::Alias(alias) => Some(alias.alias.name.clone()),
916 Expression::Column(col) => Some(col.name.name.clone()),
917 Expression::Identifier(id) => Some(id.name.clone()),
918 Expression::Annotated(a) => output_name(&a.this),
919 _ => None,
920 }
921}
922
923fn unalias(expr: &Expression) -> &Expression {
924 match expr {
925 Expression::Alias(alias) => &alias.this,
926 Expression::Annotated(a) => unalias(&a.this),
927 _ => expr,
928 }
929}
930
931fn is_star_expr(expr: &Expression) -> bool {
932 matches!(expr, Expression::Star(_))
933 || matches!(expr, Expression::Column(col) if col.name.name == "*")
934}
935
936fn star_qualifier(expr: &Expression) -> Option<String> {
937 match expr {
938 Expression::Star(star) => star.table.as_ref().map(|t| t.name.clone()),
939 Expression::Column(col) if col.name.name == "*" => {
940 col.table.as_ref().map(|t| t.name.clone())
941 }
942 _ => None,
943 }
944}
945
946fn dataset_from_expression(
947 expr: &Expression,
948 options: &OpenLineageOptions,
949) -> Result<OpenLineageDatasetId> {
950 match expr {
951 Expression::Table(table) => dataset_from_table_ref(table, options),
952 Expression::Identifier(id) => dataset_from_table_name(&id.name, options),
953 _ => Err(Error::unsupported(
954 "OpenLineage dataset extraction from non-table expression",
955 options.dialect.to_string(),
956 )),
957 }
958}
959
960fn dataset_from_table_ref(
961 table: &TableRef,
962 options: &OpenLineageOptions,
963) -> Result<OpenLineageDatasetId> {
964 dataset_from_table_name(&table_ref_qualified_name(table), options)
965}
966
967fn dataset_from_table_name(
968 table_name: &str,
969 options: &OpenLineageOptions,
970) -> Result<OpenLineageDatasetId> {
971 if let Some(mapped) = options.dataset_mappings.get(table_name) {
972 return Ok(mapped.clone());
973 }
974 let namespace = options.dataset_namespace.as_ref().ok_or_else(|| {
975 Error::parse(
976 format!(
977 "Missing datasetNamespace or explicit dataset mapping for table '{}'",
978 table_name
979 ),
980 0,
981 0,
982 0,
983 0,
984 )
985 })?;
986 Ok(OpenLineageDatasetId::new(namespace, table_name))
987}
988
989fn table_ref_qualified_name(table: &TableRef) -> String {
990 let mut parts = Vec::new();
991 if let Some(catalog) = &table.catalog {
992 parts.push(catalog.name.clone());
993 }
994 if let Some(schema) = &table.schema {
995 parts.push(schema.name.clone());
996 }
997 parts.push(table.name.name.clone());
998 parts.join(".")
999}
1000
1001fn collect_cte_aliases(expr: &Expression, dialect: DialectType) -> HashSet<String> {
1002 let mut aliases = HashSet::new();
1003 for node in expr.dfs() {
1004 match node {
1005 Expression::Select(select) => {
1006 if let Some(with) = &select.with {
1007 collect_with_aliases(with, dialect, &mut aliases);
1008 }
1009 }
1010 Expression::Union(union) => {
1011 if let Some(with) = &union.with {
1012 collect_with_aliases(with, dialect, &mut aliases);
1013 }
1014 }
1015 Expression::Intersect(intersect) => {
1016 if let Some(with) = &intersect.with {
1017 collect_with_aliases(with, dialect, &mut aliases);
1018 }
1019 }
1020 Expression::Except(except) => {
1021 if let Some(with) = &except.with {
1022 collect_with_aliases(with, dialect, &mut aliases);
1023 }
1024 }
1025 _ => {}
1026 }
1027 }
1028 aliases
1029}
1030
1031fn collect_with_aliases(with: &With, dialect: DialectType, aliases: &mut HashSet<String>) {
1032 for cte in &with.ctes {
1033 aliases.insert(normalize_identifier(&cte.alias.name, dialect, true));
1034 }
1035}
1036
1037fn normalize_identifier(name: &str, dialect: DialectType, is_table: bool) -> String {
1038 crate::schema::normalize_name(name, Some(dialect), is_table, true)
1039}
1040
1041fn openlineage_serialization_error(err: serde_json::Error) -> Error {
1042 Error::internal(format!("OpenLineage serialization failed: {err}"))
1043}
1044
1045fn deserialize_dialect_type<'de, D>(deserializer: D) -> std::result::Result<DialectType, D::Error>
1046where
1047 D: Deserializer<'de>,
1048{
1049 let value = String::deserialize(deserializer)?;
1050 value.parse::<DialectType>().map_err(de::Error::custom)
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055 use super::*;
1056
1057 fn options() -> OpenLineageOptions {
1058 OpenLineageOptions {
1059 dialect: DialectType::PostgreSQL,
1060 producer: "https://github.com/tobilg/polyglot".to_string(),
1061 dataset_namespace: Some("postgres://warehouse".to_string()),
1062 output_dataset: Some(OpenLineageDatasetId::new(
1063 "postgres://warehouse",
1064 "analytics.out",
1065 )),
1066 job_namespace: Some("polyglot-tests".to_string()),
1067 job_name: Some("lineage-test".to_string()),
1068 event_time: Some("2026-05-18T00:00:00Z".to_string()),
1069 run_id: Some("3b452093-782c-4ef2-9c0c-aafe2aa6f34d".to_string()),
1070 event_type: Some(OpenLineageRunEventType::Complete),
1071 ..Default::default()
1072 }
1073 }
1074
1075 #[test]
1076 fn deserializes_dialect_aliases_in_options() {
1077 let options: OpenLineageOptions =
1078 serde_json::from_str(r#"{"producer":"polyglot","dialect":"postgres"}"#)
1079 .expect("options");
1080 assert_eq!(options.dialect, DialectType::PostgreSQL);
1081 }
1082
1083 #[test]
1084 fn emits_identity_column_lineage_for_select() {
1085 let result = openlineage_column_lineage("SELECT a FROM t", &options()).expect("lineage");
1086 let field = result.facet.fields.get("a").expect("field a");
1087 assert_eq!(field.input_fields.len(), 1);
1088 assert_eq!(field.input_fields[0].name, "t");
1089 assert_eq!(field.input_fields[0].field, "a");
1090 assert_eq!(field.input_fields[0].transformations[0].subtype, "IDENTITY");
1091 }
1092
1093 #[test]
1094 fn resolves_input_dataset_behind_table_alias() {
1095 let result = openlineage_column_lineage("SELECT o.total FROM orders o", &options())
1096 .expect("lineage");
1097 let field = result.facet.fields.get("total").expect("field total");
1098 assert_eq!(field.input_fields[0].name, "orders");
1099 assert_eq!(field.input_fields[0].field, "total");
1100 }
1101
1102 #[test]
1103 fn emits_transformation_column_lineage_for_expression() {
1104 let result =
1105 openlineage_column_lineage("SELECT a + b AS c FROM t", &options()).expect("lineage");
1106 let field = result.facet.fields.get("c").expect("field c");
1107 assert_eq!(field.input_fields.len(), 2);
1108 assert!(field.input_fields.iter().any(|f| f.field == "a"));
1109 assert!(field.input_fields.iter().any(|f| f.field == "b"));
1110 assert!(field
1111 .input_fields
1112 .iter()
1113 .all(|f| f.transformations[0].subtype == "TRANSFORMATION"));
1114 }
1115
1116 #[test]
1117 fn emits_aggregation_column_lineage() {
1118 let result =
1119 openlineage_column_lineage("SELECT SUM(amount) AS total FROM orders", &options())
1120 .expect("lineage");
1121 let field = result.facet.fields.get("total").expect("field total");
1122 assert_eq!(field.input_fields[0].field, "amount");
1123 assert_eq!(
1124 field.input_fields[0].transformations[0].subtype,
1125 "AGGREGATION"
1126 );
1127 }
1128
1129 #[test]
1130 fn infers_insert_output_dataset() {
1131 let mut opts = options();
1132 opts.output_dataset = None;
1133 let result =
1134 openlineage_column_lineage("INSERT INTO analytics.out SELECT a FROM raw.input", &opts)
1135 .expect("lineage");
1136 assert_eq!(result.outputs[0].name, "analytics.out");
1137 assert_eq!(result.inputs[0].name, "raw.input");
1138 }
1139
1140 #[test]
1141 fn maps_insert_target_columns_to_output_fields() {
1142 let mut opts = options();
1143 opts.output_dataset = None;
1144 let result = openlineage_column_lineage(
1145 "INSERT INTO analytics.out (target_a) SELECT source_a FROM raw.input",
1146 &opts,
1147 )
1148 .expect("lineage");
1149 let field = result.facet.fields.get("target_a").expect("target field");
1150 assert_eq!(field.input_fields[0].field, "source_a");
1151 assert!(!result.facet.fields.contains_key("source_a"));
1152 }
1153
1154 #[test]
1155 fn pure_select_requires_output_dataset() {
1156 let mut opts = options();
1157 opts.output_dataset = None;
1158 let err = openlineage_column_lineage("SELECT a FROM t", &opts).unwrap_err();
1159 assert!(err.to_string().contains("outputDataset is required"));
1160 }
1161
1162 #[test]
1163 fn emits_job_event_payload() {
1164 let result = openlineage_job_event("SELECT a FROM t", &options()).expect("event");
1165 assert_eq!(result.event["job"]["namespace"], "polyglot-tests");
1166 assert_eq!(
1167 result.event["job"]["facets"]["sql"]["_schemaURL"],
1168 SQL_JOB_FACET_SCHEMA_URL
1169 );
1170 assert_eq!(
1171 result.event["outputs"][0]["facets"]["columnLineage"]["fields"]["a"]["inputFields"][0]
1172 ["field"],
1173 "a"
1174 );
1175 }
1176
1177 #[test]
1178 fn emits_run_event_payload() {
1179 let result = openlineage_run_event("SELECT a FROM t", &options()).expect("event");
1180 assert_eq!(result.event["eventType"], "COMPLETE");
1181 assert_eq!(
1182 result.event["run"]["runId"],
1183 "3b452093-782c-4ef2-9c0c-aafe2aa6f34d"
1184 );
1185 }
1186
1187 #[test]
1188 fn select_star_without_schema_warns() {
1189 let result = openlineage_column_lineage("SELECT * FROM t", &options()).expect("lineage");
1190 assert!(result.facet.fields.is_empty());
1191 assert!(result
1192 .warnings
1193 .iter()
1194 .any(|w| w.code == "W_STAR_WITHOUT_SCHEMA"));
1195 }
1196
1197 #[test]
1198 fn select_star_with_schema_expands_fields() {
1199 let mut opts = options();
1200 opts.schema = Some(ValidationSchema {
1201 strict: None,
1202 tables: vec![crate::validation::SchemaTable {
1203 name: "t".to_string(),
1204 schema: None,
1205 columns: vec![
1206 crate::validation::SchemaColumn {
1207 name: "a".to_string(),
1208 data_type: "INT".to_string(),
1209 nullable: None,
1210 primary_key: false,
1211 unique: false,
1212 references: None,
1213 },
1214 crate::validation::SchemaColumn {
1215 name: "b".to_string(),
1216 data_type: "TEXT".to_string(),
1217 nullable: None,
1218 primary_key: false,
1219 unique: false,
1220 references: None,
1221 },
1222 ],
1223 aliases: vec![],
1224 primary_key: vec![],
1225 unique_keys: vec![],
1226 foreign_keys: vec![],
1227 }],
1228 });
1229
1230 let result = openlineage_column_lineage("SELECT * FROM t", &opts).expect("lineage");
1231 assert!(result.facet.fields.contains_key("a"));
1232 assert!(result.facet.fields.contains_key("b"));
1233 }
1234}