use std::collections::{HashMap, HashSet, VecDeque};
use serde::{Deserialize, Serialize};
use crate::diagnostics::{codes, Diagnostic, DiagnosticCategory, DiagnosticStage, Severity};
use crate::model::TransformationContract;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LineageEdge {
pub output: String,
pub inputs: Vec<String>,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LineageGovernance {
#[serde(skip_serializing_if = "Option::is_none")]
pub owner: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub steward: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LineageAnalysisReport {
pub graph: Vec<LineageEdge>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dependency: Option<LineageDependencyResult>,
#[serde(skip_serializing_if = "Option::is_none")]
pub impact: Option<LineageImpactResult>,
pub governance: LineageGovernance,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub diagnostics: Vec<Diagnostic>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LineageDependencyResult {
pub output: String,
pub inputs: Vec<String>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LineageImpactResult {
pub input: String,
pub outputs: Vec<String>,
}
#[must_use]
pub fn analyze(contract: &TransformationContract) -> LineageAnalysisReport {
analyze_with_options(contract, None, None)
}
#[must_use]
pub fn analyze_with_options(
contract: &TransformationContract,
impact_input: Option<&str>,
dependency_output: Option<&str>,
) -> LineageAnalysisReport {
let graph = build_graph(contract);
let reverse = reverse_graph(&graph);
let mut diagnostics = Vec::new();
let input_ids: HashSet<_> = contract
.inputs
.iter()
.map(|input| input.id.as_str())
.collect();
let output_ids: HashSet<_> = contract
.outputs
.iter()
.map(|output| output.id.as_str())
.collect();
if let Some(input) = impact_input {
if !input_ids.contains(input)
&& !graph
.iter()
.any(|edge| edge.inputs.iter().any(|id| id == input))
{
diagnostics.push(
Diagnostic::new(
codes::UNRESOLVED_REFERENCE,
Severity::Warning,
DiagnosticStage::Analysis,
DiagnosticCategory::Reference,
format!("impact query input '{input}' was not found in the contract"),
)
.with_object_ref(format!("inputs.{input}")),
);
}
}
if let Some(output) = dependency_output {
if !output_ids.contains(output) && !graph.iter().any(|edge| edge.output == output) {
diagnostics.push(
Diagnostic::new(
codes::UNRESOLVED_REFERENCE,
Severity::Warning,
DiagnosticStage::Analysis,
DiagnosticCategory::Reference,
format!("dependency query output '{output}' was not found in the contract"),
)
.with_object_ref(format!("outputs.{output}")),
);
}
}
LineageAnalysisReport {
dependency: dependency_output.map(|output| LineageDependencyResult {
output: output.into(),
inputs: graph
.iter()
.find(|edge| edge.output == output)
.map(|edge| edge.inputs.clone())
.unwrap_or_default(),
}),
impact: impact_input.map(|input| LineageImpactResult {
input: input.into(),
outputs: transitive_impact(input, &reverse),
}),
governance: governance_from(contract),
graph,
diagnostics,
}
}
fn build_graph(contract: &TransformationContract) -> Vec<LineageEdge> {
contract
.lineage
.as_ref()
.map(|lineage| {
lineage
.mappings
.iter()
.map(|mapping| LineageEdge {
output: mapping.output.clone(),
inputs: mapping.inputs.clone(),
})
.collect()
})
.unwrap_or_default()
}
fn reverse_graph(graph: &[LineageEdge]) -> HashMap<String, Vec<String>> {
let mut reverse: HashMap<String, Vec<String>> = HashMap::new();
for edge in graph {
for input in &edge.inputs {
reverse
.entry(input.clone())
.or_default()
.push(edge.output.clone());
}
}
reverse
}
fn transitive_impact(input: &str, reverse: &HashMap<String, Vec<String>>) -> Vec<String> {
let mut seen = HashSet::new();
let mut queue = VecDeque::new();
queue.push_back(input.to_string());
while let Some(current) = queue.pop_front() {
if let Some(outputs) = reverse.get(¤t) {
for output in outputs {
if seen.insert(output.clone()) {
queue.push_back(output.clone());
}
}
}
}
seen.remove(input);
let mut outputs: Vec<_> = seen.into_iter().collect();
outputs.sort();
outputs
}
fn governance_from(contract: &TransformationContract) -> LineageGovernance {
contract
.metadata
.as_ref()
.and_then(|m| m.governance.as_ref())
.map(|g| LineageGovernance {
owner: g.owner.clone(),
steward: g.steward.clone(),
})
.unwrap_or_default()
}