use crate::graph::{ConstructExecutor, Graph};
use crate::pipeline_engine::pass::{Pass, PassContext, PassResult, PassType};
use crate::utils::error::{Error, Result};
use crate::validation::shacl::{ShaclShapeSet, ShapeLoader};
use crate::validation::validator::SparqlValidator;
use crate::validation::violation::ValidationResult;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::path::Path;
use std::time::Instant;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NormalizationRule {
pub name: String,
pub construct: String,
pub order: i32,
pub description: Option<String>,
pub when: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NormalizationReceipt {
pub input_hash: String,
pub pre_validation: ValidationSummary,
pub post_validation: ValidationSummary,
pub owl_triples_materialized: usize,
pub total_triples: usize,
pub rules_executed: Vec<RuleExecution>,
pub duration_ms: u64,
pub success: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ValidationSummary {
pub passed: bool,
pub violation_count: usize,
pub duration_ms: u64,
}
impl From<&ValidationResult> for ValidationSummary {
fn from(result: &ValidationResult) -> Self {
Self {
passed: result.passed,
violation_count: result.violation_count,
duration_ms: result.duration_ms,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuleExecution {
pub name: String,
pub query_hash: String,
pub triples_materialized: usize,
pub skipped: bool,
}
#[derive(Debug, Clone)]
pub struct NormalizationPass {
rules: Vec<NormalizationRule>,
shacl_shapes: Option<ShaclShapeSet>,
enable_shacl_gate: bool,
receipt: Option<NormalizationReceipt>,
}
impl NormalizationPass {
pub fn new() -> Self {
Self {
rules: Vec::new(),
shacl_shapes: None,
enable_shacl_gate: true,
receipt: None,
}
}
pub fn receipt(&self) -> Option<&NormalizationReceipt> {
self.receipt.as_ref()
}
pub fn parse_ttl(&self, ttl_path: &Path) -> Result<Graph> {
let content = std::fs::read_to_string(ttl_path).map_err(|e| {
Error::new(&format!(
"Failed to read TTL file '{}': {}",
ttl_path.display(),
e
))
})?;
let graph = Graph::new()?;
graph.insert_turtle(&content).map_err(|e| {
Error::new(&format!(
"Failed to parse TTL '{}': {}",
ttl_path.display(),
e
))
})?;
Ok(graph)
}
fn hash_query(&self, query: &str) -> String {
let hash = Sha256::digest(query.as_bytes());
format!("{:x}", hash)
}
pub fn add_rule(&mut self, rule: NormalizationRule) {
self.rules.push(rule);
self.rules.sort_by_key(|r| r.order);
}
pub fn with_rules(mut self, rules: Vec<NormalizationRule>) -> Self {
self.rules = rules;
self.rules.sort_by_key(|r| r.order);
self
}
pub fn with_shacl_shapes(mut self, shapes: ShaclShapeSet) -> Self {
self.shacl_shapes = Some(shapes);
self
}
pub fn load_shacl_shapes(&mut self, ctx: &PassContext<'_>) -> Result<()> {
let loader = ShapeLoader::new();
let shapes = loader
.load(ctx.graph)
.map_err(|e| Error::new(&e.to_string()))?;
self.shacl_shapes = Some(shapes);
Ok(())
}
pub fn with_shacl_gate(mut self, enabled: bool) -> Self {
self.enable_shacl_gate = enabled;
self
}
fn validate_shacl_gate(&self, ctx: &PassContext<'_>) -> Result<ValidationResult> {
if !self.enable_shacl_gate {
return Ok(ValidationResult::pass(0));
}
if let Some(ref shapes) = self.shacl_shapes {
if shapes.is_empty() {
return Ok(ValidationResult::pass(0));
}
let validator = SparqlValidator::new();
let report = validator
.validate_shapes(ctx.graph, shapes)
.map_err(|e| Error::new(&e.to_string()))?;
if !report.passed {
let violation_count = report.violations.len();
let messages: Vec<String> = report
.violations
.iter()
.take(5)
.map(|v| format!(" - {}: {}", v.constraint_type, v.message))
.collect();
return Err(Error::new(&format!(
"🚨 SHACL Validation Failed: {} violation(s) detected\n\n\
μ₁:normalization STOPPED THE LINE (Andon Protocol)\n\n\
First {} violations:\n{}\n\n\
Fix violations before proceeding.\n\n\
Constraint types violated: {:?}",
violation_count,
messages.len(),
messages.join("\n"),
report
.violations
.iter()
.map(|v| v.constraint_type)
.collect::<std::collections::HashSet<_>>()
)));
}
Ok(report)
} else {
Ok(ValidationResult::pass(0))
}
}
fn should_execute_rule(&self, ctx: &PassContext<'_>, rule: &NormalizationRule) -> Result<bool> {
if let Some(ref when_query) = rule.when {
let results = ctx.graph.query(when_query)?;
match results {
oxigraph::sparql::QueryResults::Boolean(b) => Ok(b),
_ => Ok(true), }
} else {
Ok(true) }
}
fn count_graph_triples(&self, graph: &Graph) -> Result<usize> {
let query = "SELECT (COUNT(*) as ?count) WHERE { ?s ?p ?o }";
let results = graph.query(query)?;
match results {
oxigraph::sparql::QueryResults::Solutions(mut solutions) => {
if let Some(Ok(solution)) = solutions.next() {
if let Some(term) = solution.get("count") {
let count_str = term.to_string();
let count_str = count_str
.trim_start_matches('"')
.split('"')
.next()
.unwrap_or("0");
return count_str.parse::<usize>().map_err(|e| {
Error::new(&format!("Failed to parse triple count: {}", e))
});
}
}
Ok(0)
}
_ => Ok(0),
}
}
}
impl Default for NormalizationPass {
fn default() -> Self {
Self::new()
}
}
impl Pass for NormalizationPass {
fn pass_type(&self) -> PassType {
PassType::Normalization
}
fn name(&self) -> &str {
"μ₁:normalization"
}
fn execute(&self, ctx: &mut PassContext<'_>) -> Result<PassResult> {
let start = Instant::now();
let mut total_triples = 0;
let mut rules_executed = Vec::new();
let _initial_triples = self.count_graph_triples(ctx.graph)?;
let input_hash = "".to_string();
let pre_validation = self.validate_shacl_gate(ctx)?;
let executor = ConstructExecutor::new(ctx.graph);
for rule in &self.rules {
let should_execute = self.should_execute_rule(ctx, rule)?;
if !should_execute {
rules_executed.push(RuleExecution {
name: rule.name.clone(),
query_hash: self.hash_query(&rule.construct),
triples_materialized: 0,
skipped: true,
});
continue;
}
let triples_added = executor.execute_and_materialize(&rule.construct)?;
total_triples += triples_added;
rules_executed.push(RuleExecution {
name: rule.name.clone(),
query_hash: self.hash_query(&rule.construct),
triples_materialized: triples_added,
skipped: false,
});
}
let post_validation = self.validate_shacl_gate(ctx)?;
let duration = start.elapsed();
let final_triples = self.count_graph_triples(ctx.graph)?;
let _receipt = NormalizationReceipt {
input_hash,
pre_validation: ValidationSummary::from(&pre_validation),
post_validation: ValidationSummary::from(&post_validation),
owl_triples_materialized: total_triples,
total_triples: final_triples,
rules_executed,
duration_ms: duration.as_millis() as u64,
success: true,
};
Ok(PassResult::success()
.with_triples(total_triples)
.with_duration(duration))
}
}
impl NormalizationPass {
pub fn with_standard_rules() -> Self {
let mut pass = Self::new();
pass.add_rule(NormalizationRule {
name: "owl-inverse-properties".to_string(),
construct: r"
PREFIX owl: <http://www.w3.org/2002/07/owl#>
CONSTRUCT {
?y ?invProp ?x .
}
WHERE {
?prop owl:inverseOf ?invProp .
?x ?prop ?y .
}
"
.to_string(),
order: 1,
description: Some("OWL: Materialize inverse property relationships".to_string()),
when: None,
});
pass.add_rule(NormalizationRule {
name: "rdfs-subclass-inference".to_string(),
construct: r"
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
CONSTRUCT {
?instance rdf:type ?superClass .
}
WHERE {
?instance rdf:type ?class .
?class rdfs:subClassOf+ ?superClass .
FILTER (?class != ?superClass)
}
"
.to_string(),
order: 2,
description: Some("RDFS: Materialize subclass type relationships".to_string()),
when: None,
});
pass.add_rule(NormalizationRule {
name: "rdfs-domain-range-inference".to_string(),
construct: r"
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
CONSTRUCT {
?subject rdf:type ?domainClass .
?object rdf:type ?rangeClass .
}
WHERE {
?subject ?prop ?object .
OPTIONAL { ?prop rdfs:domain ?domainClass . }
OPTIONAL { ?prop rdfs:range ?rangeClass . }
FILTER (isIRI(?object))
}
"
.to_string(),
order: 3,
description: Some("RDFS: Infer types from domain and range declarations".to_string()),
when: None,
});
pass.add_rule(NormalizationRule {
name: "owl-symmetric-properties".to_string(),
construct: r"
PREFIX owl: <http://www.w3.org/2002/07/owl#>
CONSTRUCT {
?y ?prop ?x .
}
WHERE {
?prop a owl:SymmetricProperty .
?x ?prop ?y .
}
"
.to_string(),
order: 4,
description: Some("OWL: Materialize symmetric property relationships".to_string()),
when: None,
});
pass.add_rule(NormalizationRule {
name: "owl-transitive-properties".to_string(),
construct: r"
PREFIX owl: <http://www.w3.org/2002/07/owl#>
CONSTRUCT {
?x ?prop ?z .
}
WHERE {
?prop a owl:TransitiveProperty .
?x ?prop ?y .
?y ?prop ?z .
}
"
.to_string(),
order: 5,
description: Some("OWL: Materialize transitive property relationships".to_string()),
when: None,
});
pass.add_rule(NormalizationRule {
name: "owl-equivalent-classes".to_string(),
construct: r"
PREFIX owl: <http://www.w3.org/2002/07/owl#>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
CONSTRUCT {
?instance rdf:type ?equivClass .
}
WHERE {
?instance rdf:type ?class .
?class owl:equivalentClass ?equivClass .
FILTER (?class != ?equivClass)
}
"
.to_string(),
order: 6,
description: Some("OWL: Materialize equivalent class memberships".to_string()),
when: None,
});
pass
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::Graph;
use std::path::PathBuf;
#[test]
fn test_normalization_pass_empty() {
let graph = Graph::new().unwrap();
let pass = NormalizationPass::new();
let mut ctx = PassContext::new(&graph, PathBuf::new(), PathBuf::new());
let result = pass.execute(&mut ctx).unwrap();
assert!(result.success);
assert_eq!(result.triples_added, 0);
}
#[test]
fn test_normalization_with_rule() {
let graph = Graph::new().unwrap();
graph
.insert_turtle(
r#"
@prefix ex: <http://example.org/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
@prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> .
ex:Person a rdfs:Class .
ex:Student rdfs:subClassOf ex:Person .
ex:alice rdf:type ex:Student .
"#,
)
.unwrap();
let mut pass = NormalizationPass::new();
pass.add_rule(NormalizationRule {
name: "noop".to_string(),
construct: r#"
CONSTRUCT {}
WHERE { ?s ?p ?o }
"#
.to_string(),
order: 1,
description: Some("No-op rule".to_string()),
when: None,
});
let mut ctx = PassContext::new(&graph, PathBuf::new(), PathBuf::new());
let result = pass.execute(&mut ctx).unwrap();
assert!(result.success);
assert_eq!(result.triples_added, 0);
}
#[test]
fn test_rule_ordering() {
let mut pass = NormalizationPass::new();
pass.add_rule(NormalizationRule {
name: "third".to_string(),
construct: "CONSTRUCT {} WHERE {}".to_string(),
order: 3,
description: None,
when: None,
});
pass.add_rule(NormalizationRule {
name: "first".to_string(),
construct: "CONSTRUCT {} WHERE {}".to_string(),
order: 1,
description: None,
when: None,
});
pass.add_rule(NormalizationRule {
name: "second".to_string(),
construct: "CONSTRUCT {} WHERE {}".to_string(),
order: 2,
description: None,
when: None,
});
assert_eq!(pass.rules[0].name, "first");
assert_eq!(pass.rules[1].name, "second");
assert_eq!(pass.rules[2].name, "third");
}
}