use crate::graph::{Graph, GraphExport};
use crate::marketplace::trust::TrustTier;
use crate::pack_resolver::{PackResolver, ResolvedPacks};
use crate::pipeline_engine::epoch::Epoch;
use crate::pipeline_engine::guard::GuardSet;
use crate::pipeline_engine::pass::{Pass, PassContext, PassExecution, PassResult};
use crate::pipeline_engine::passes::{
CanonicalizationPass, EmissionPass, ExtractionPass, NormalizationPass, ReceiptGenerationPass,
};
use crate::pipeline_engine::receipt::{
BuildReceipt, BundleExpansionRef, OutputFile, PackProvenance, ReceiptPolicies,
};
use crate::pipeline_engine::vocabulary::VocabularyRegistry;
use crate::utils::error::{Error, Result};
use oxigraph::io::RdfFormat;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::path::{Path, PathBuf};
use std::time::Instant;
#[allow(dead_code)]
fn stage_pack_templates(
base_path: &Path, templates: &[crate::pack_resolver::TemplateDef],
) -> Result<Vec<String>> {
let stage = base_path.join(".ggen").join("pack-stage");
let mut names = Vec::new();
for t in templates {
let dest = stage.join(&t.path);
if let Some(p) = dest.parent() {
std::fs::create_dir_all(p)
.map_err(|e| Error::new(&format!("Failed to create pack-stage dir: {}", e)))?;
}
std::fs::write(&dest, &t.content)
.map_err(|e| Error::new(&format!("Failed to write staged template: {}", e)))?;
names.push(t.path.display().to_string());
}
Ok(names)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum VerifyMode {
#[default]
None,
VerifyInputs,
VerifyOutputs,
Full,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineConfig {
pub project_name: String,
pub project_version: String,
pub base_path: PathBuf,
pub ontology_sources: Vec<PathBuf>,
pub output_dir: PathBuf,
pub receipt_path: Option<PathBuf>,
pub verify_mode: VerifyMode,
pub previous_receipt: Option<PathBuf>,
pub toolchain_version: String,
#[serde(default)]
pub enable_pack_resolution: bool,
}
impl PipelineConfig {
pub fn new(project_name: impl Into<String>, project_version: impl Into<String>) -> Self {
Self {
project_name: project_name.into(),
project_version: project_version.into(),
base_path: PathBuf::from("."),
ontology_sources: Vec::new(),
output_dir: PathBuf::from("."),
receipt_path: Some(PathBuf::from(".ggen/receipt.json")),
verify_mode: VerifyMode::None,
previous_receipt: None,
toolchain_version: env!("CARGO_PKG_VERSION").to_string(),
enable_pack_resolution: true,
}
}
pub fn with_base_path(mut self, path: impl Into<PathBuf>) -> Self {
self.base_path = path.into();
self
}
pub fn with_ontology(mut self, path: impl Into<PathBuf>) -> Self {
self.ontology_sources.push(path.into());
self
}
pub fn with_ontologies(mut self, paths: Vec<PathBuf>) -> Self {
self.ontology_sources.extend(paths);
self
}
pub fn with_output_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.output_dir = path.into();
self
}
pub fn with_receipt_path(mut self, path: impl Into<PathBuf>) -> Self {
self.receipt_path = Some(path.into());
self
}
pub fn with_verify_mode(mut self, mode: VerifyMode) -> Self {
self.verify_mode = mode;
self
}
pub fn with_previous_receipt(mut self, path: impl Into<PathBuf>) -> Self {
self.previous_receipt = Some(path.into());
self
}
}
pub struct StagedPipeline {
config: PipelineConfig,
graph: Graph,
epoch: Option<Epoch>,
executed_passes: Vec<PassExecution>,
generated_files: Vec<PathBuf>,
vocabulary_registry: VocabularyRegistry,
guard_set: GuardSet,
pack_resolver: Option<PackResolver>,
resolved_packs: Option<ResolvedPacks>,
normalization: NormalizationPass,
extraction: ExtractionPass,
emission: EmissionPass,
canonicalization: CanonicalizationPass,
#[allow(dead_code)]
receipt_gen: ReceiptGenerationPass,
}
impl StagedPipeline {
pub fn new(config: PipelineConfig) -> Result<Self> {
let graph = Graph::new()?;
let pack_resolver = PackResolver::new(&config.base_path).ok();
Ok(Self {
config: config.clone(),
graph,
epoch: None,
executed_passes: Vec::new(),
generated_files: Vec::new(),
vocabulary_registry: VocabularyRegistry::with_standard_vocabularies(),
guard_set: GuardSet::default_v26(),
pack_resolver,
resolved_packs: None,
normalization: NormalizationPass::with_standard_rules(),
extraction: ExtractionPass::with_standard_rules(),
emission: EmissionPass::new(),
canonicalization: CanonicalizationPass::new(),
receipt_gen: ReceiptGenerationPass::new(&config.toolchain_version)
.with_receipt_path(config.receipt_path.clone().unwrap_or_default()),
})
}
pub fn with_normalization(mut self, pass: NormalizationPass) -> Self {
self.normalization = pass;
self
}
pub fn with_extraction(mut self, pass: ExtractionPass) -> Self {
self.extraction = pass;
self
}
pub fn with_emission(mut self, pass: EmissionPass) -> Self {
self.emission = pass;
self
}
pub fn with_vocabulary_registry(mut self, registry: VocabularyRegistry) -> Self {
self.vocabulary_registry = registry;
self
}
pub fn with_guards(mut self, guards: GuardSet) -> Self {
self.guard_set = guards;
self
}
pub fn load_ontologies(&mut self) -> Result<&Epoch> {
self.epoch = Some(Epoch::create(
&self.config.base_path,
&self.config.ontology_sources,
)?);
for source in &self.config.ontology_sources {
let full_path = self.config.base_path.join(source);
let content = std::fs::read_to_string(&full_path).map_err(|e| {
Error::new(&format!(
"Failed to read ontology '{}': {}",
full_path.display(),
e
))
})?;
let namespaces = VocabularyRegistry::extract_namespaces(&content);
self.vocabulary_registry.validate_namespaces(&namespaces)?;
self.graph.insert_turtle(&content)?;
}
Ok(self.epoch.as_ref().unwrap())
}
pub fn resolved_packs(&self) -> Option<&ResolvedPacks> {
self.resolved_packs.as_ref()
}
pub fn verify_inputs(&self, previous_epoch: &Epoch) -> Result<bool> {
if let Some(ref epoch) = self.epoch {
Ok(epoch.id == previous_epoch.id)
} else {
Err(Error::new("No epoch loaded. Call load_ontologies() first."))
}
}
#[allow(dead_code)]
fn run_pass<P: Pass>(&mut self, pass: &P, ctx: &mut PassContext<'_>) -> Result<PassResult> {
let start = Instant::now();
let result = pass.execute(ctx)?;
let duration = start.elapsed();
let mut execution = pass.create_execution_record(&result);
execution.duration_ms = duration.as_millis() as u64;
self.executed_passes.push(execution);
Ok(result)
}
pub fn run(&mut self) -> Result<BuildReceipt> {
let _pipeline_start = Instant::now();
if let Some(ref resolver) = self.pack_resolver {
match resolver.resolve() {
Ok(resolved) => {
self.graph = resolved.merged_ontology.clone();
self.resolved_packs = Some(resolved);
}
Err(e) if e.to_string().contains("Lockfile not found") => {
tracing::debug!("No lockfile found, skipping pack resolution");
}
Err(e) => {
return Err(e);
}
}
}
if self.epoch.is_none() && self.resolved_packs.is_some() {
let turtle = GraphExport::new(&self.graph)
.write_to_string(RdfFormat::Turtle)
.map_err(|e| Error::new(&format!("Pack graph export failed: {}", e)))?;
let digest = format!("{:x}", Sha256::digest(turtle.as_bytes()));
let triple_count = self.graph.len();
self.epoch = Some(Epoch::from_pack_merged_substrate(digest, triple_count));
}
if self.epoch.is_none() && self.resolved_packs.is_none() {
self.load_ontologies()?;
}
if matches!(
self.config.verify_mode,
VerifyMode::VerifyInputs | VerifyMode::Full
) {
if let Some(ref receipt_path) = self.config.previous_receipt {
let previous_receipt = BuildReceipt::read_from_file(receipt_path)?;
if self.epoch.as_ref().map(|e| &e.id) != Some(&previous_receipt.epoch_id) {
return Err(Error::new(
"Input epoch does not match previous receipt. Inputs have changed.",
));
}
}
}
let output_dir = self.config.base_path.join(&self.config.output_dir);
std::fs::create_dir_all(&output_dir).map_err(|e| {
Error::new(&format!(
"Failed to create output directory '{}': {}",
output_dir.display(),
e
))
})?;
let normalization = self.normalization.clone();
let mut extraction = self.extraction.clone();
let mut emission = self.emission.clone();
if let Some(ref rp) = self.resolved_packs {
extraction.extend_with_pack_construct_queries(&rp.queries)?;
emission.extend_with_pack_templates(&rp.templates)?;
}
let canonicalization = self.canonicalization.clone();
let mut ctx = PassContext::new(&self.graph, self.config.base_path.clone(), output_dir)
.with_project(
self.config.project_name.clone(),
self.config.project_version.clone(),
);
let start = Instant::now();
let span = tracing::info_span!(
"pipeline.load",
"operation.name" = "pipeline.load",
"operation.type" = "pipeline",
"pipeline.stage" = "mu1",
);
let _guard = span.enter();
let norm_result = normalization.execute(&mut ctx)?;
let mut execution = normalization.create_execution_record(&norm_result);
execution.duration_ms = start.elapsed().as_millis() as u64;
self.executed_passes.push(execution);
if !norm_result.success {
return Err(Error::new(&format!(
"Normalization failed: {:?}",
norm_result.error
)));
}
let elapsed = start.elapsed();
tracing::Span::current().record("pipeline.duration_ms", elapsed.as_millis() as u64);
tracing::info!(
stage = "mu1",
elapsed_ms = elapsed.as_millis() as u64,
"Pipeline stage completed"
);
drop(_guard);
let start = Instant::now();
let span = tracing::info_span!(
"pipeline.extract",
"operation.name" = "pipeline.extract",
"operation.type" = "pipeline",
"pipeline.stage" = "mu2",
);
let _guard = span.enter();
let extract_result = extraction.execute(&mut ctx)?;
let mut execution = extraction.create_execution_record(&extract_result);
execution.duration_ms = start.elapsed().as_millis() as u64;
self.executed_passes.push(execution);
if !extract_result.success {
return Err(Error::new(&format!(
"Extraction failed: {:?}",
extract_result.error
)));
}
let elapsed = start.elapsed();
tracing::Span::current().record("pipeline.duration_ms", elapsed.as_millis() as u64);
tracing::info!(
stage = "mu2",
elapsed_ms = elapsed.as_millis() as u64,
"Pipeline stage completed"
);
drop(_guard);
if let Some(ref rp) = self.resolved_packs {
let _staged = stage_pack_templates(&self.config.base_path, &rp.templates)?;
}
let start = Instant::now();
let span = tracing::info_span!(
"pipeline.generate",
"operation.name" = "pipeline.generate",
"operation.type" = "pipeline",
"pipeline.stage" = "mu3",
);
let _guard = span.enter();
let emission_result = emission.execute(&mut ctx)?;
let mut execution = emission.create_execution_record(&emission_result);
execution.duration_ms = start.elapsed().as_millis() as u64;
self.executed_passes.push(execution);
if !emission_result.success {
return Err(Error::new(&format!(
"Emission failed: {:?}",
emission_result.error
)));
}
let elapsed = start.elapsed();
tracing::Span::current().record("pipeline.duration_ms", elapsed.as_millis() as u64);
tracing::info!(
stage = "mu3",
elapsed_ms = elapsed.as_millis() as u64,
"Pipeline stage completed"
);
drop(_guard);
let start = Instant::now();
let span = tracing::info_span!(
"pipeline.validate",
"operation.name" = "pipeline.validate",
"operation.type" = "pipeline",
"pipeline.stage" = "mu4",
);
let _guard = span.enter();
let canon_result = canonicalization.execute(&mut ctx)?;
let mut execution = canonicalization.create_execution_record(&canon_result);
execution.duration_ms = start.elapsed().as_millis() as u64;
self.executed_passes.push(execution);
if !canon_result.success {
return Err(Error::new(&format!(
"Canonicalization failed: {:?}",
canon_result.error
)));
}
let elapsed = start.elapsed();
tracing::Span::current().record("pipeline.duration_ms", elapsed.as_millis() as u64);
tracing::info!(
stage = "mu4",
elapsed_ms = elapsed.as_millis() as u64,
"Pipeline stage completed"
);
drop(_guard);
self.generated_files = ctx.generated_files.clone();
let output_records = self.create_output_records(&ctx)?;
let start = Instant::now();
let span = tracing::info_span!(
"pipeline.emit",
"operation.name" = "pipeline.emit",
"operation.type" = "pipeline",
"pipeline.stage" = "mu5",
);
let _guard = span.enter();
let epoch = self.epoch.as_ref().unwrap();
let mut receipt = BuildReceipt::new(
epoch,
self.executed_passes.clone(),
output_records,
&self.config.toolchain_version,
)
.with_policies(ReceiptPolicies {
blank_node_policy: "canonicalize".to_string(),
ordering_policy: "deterministic".to_string(),
formatting_policy: "language-specific".to_string(),
active_guards: vec!["path-guard".to_string(), "secret-guard".to_string()],
});
if let Some(ref resolved) = self.resolved_packs {
for expansion in &resolved.bundle_expansions {
receipt.add_bundle_expansion(BundleExpansionRef {
bundle_id: expansion.bundle_id.clone(),
expanded_to: expansion.expanded_to.clone(),
});
}
for pack_id in &resolved.atomic_packs {
let version = resolved
.pack_versions
.get(&pack_id.to_string())
.cloned()
.unwrap_or_else(|| "unknown".to_string());
receipt.add_pack(PackProvenance {
pack_id: pack_id.to_string(),
version,
signature: resolved
.pack_signatures
.get(&pack_id.to_string())
.cloned()
.unwrap_or_else(|| "local:unsigned".to_string()),
digest: resolved.digest_for_pack(pack_id),
registry_type: resolved
.pack_registry_types
.get(&pack_id.to_string())
.cloned(),
origin_url: resolved.pack_origin_urls.get(&pack_id.to_string()).cloned(),
templates_contributed: resolved.template_paths_for_pack(pack_id),
queries_contributed: resolved.query_names_for_pack(pack_id),
files_generated: vec![],
});
}
receipt.set_profile(crate::pipeline_engine::receipt::ProfileRef {
profile_id: resolved.profile.clone(),
runtime_constraints: vec![],
trust_requirement: TrustTier::Experimental,
});
}
if let Some(ref receipt_path) = self.config.receipt_path {
let full_receipt_path = self.config.base_path.join(receipt_path);
if let Some(parent) = full_receipt_path.parent() {
std::fs::create_dir_all(parent)?;
}
receipt.write_to_file(&full_receipt_path)?;
}
if matches!(
self.config.verify_mode,
VerifyMode::VerifyOutputs | VerifyMode::Full
) {
let output_dir = self.config.base_path.join(&self.config.output_dir);
if !receipt.verify_outputs(&output_dir)? {
return Err(Error::new(
"Output verification failed. hash(A) ≠ hash(μ(O))",
));
}
}
let elapsed = start.elapsed();
tracing::Span::current().record("pipeline.duration_ms", elapsed.as_millis() as u64);
tracing::info!(
stage = "mu5",
elapsed_ms = elapsed.as_millis() as u64,
"Pipeline stage completed"
);
drop(_guard);
Ok(receipt)
}
fn create_output_records(&self, ctx: &PassContext<'_>) -> Result<Vec<OutputFile>> {
let mut outputs = Vec::new();
for rel_path in &ctx.generated_files {
let full_path = ctx.output_dir.join(rel_path);
if !full_path.exists() {
continue;
}
outputs.push(OutputFile::from_path(
&full_path,
rel_path.clone(),
"μ₃:emission",
)?);
}
Ok(outputs)
}
pub fn epoch(&self) -> Option<&Epoch> {
self.epoch.as_ref()
}
pub fn executed_passes(&self) -> &[PassExecution] {
&self.executed_passes
}
pub fn generated_files(&self) -> &[PathBuf] {
&self.generated_files
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::pipeline_engine::vocabulary::AllowedVocabulary;
use tempfile::TempDir;
#[test]
fn test_pipeline_config_builder() {
let config = PipelineConfig::new("test", "1.0.0")
.with_ontology("ontology/domain.ttl")
.with_output_dir(".")
.with_verify_mode(VerifyMode::Full);
assert_eq!(config.project_name, "test");
assert_eq!(config.project_version, "1.0.0");
assert_eq!(config.ontology_sources.len(), 1);
assert_eq!(config.verify_mode, VerifyMode::Full);
}
#[test]
fn test_pipeline_creation() {
let config = PipelineConfig::new("test", "1.0.0");
let pipeline = StagedPipeline::new(config);
assert!(pipeline.is_ok());
}
#[test]
fn test_pipeline_empty_run() {
let temp_dir = TempDir::new().unwrap();
let ontology_dir = temp_dir.path().join("ontology");
std::fs::create_dir_all(&ontology_dir).unwrap();
std::fs::write(
ontology_dir.join("domain.ttl"),
r#"
@prefix ex: <http://example.org/> .
@prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#> .
ex:Person a rdfs:Class ;
rdfs:label "Person" .
"#,
)
.unwrap();
let config = PipelineConfig::new("test", "1.0.0")
.with_base_path(temp_dir.path())
.with_ontology(PathBuf::from("ontology/domain.ttl"))
.with_output_dir("output");
let mut pipeline = StagedPipeline::new(config).unwrap();
let mut registry = VocabularyRegistry::with_standard_vocabularies();
registry.add_allowed(
AllowedVocabulary::new("http://example.org/", "ex")
.with_description("Example namespace for testing"),
);
pipeline = pipeline.with_vocabulary_registry(registry);
let receipt = pipeline.run().unwrap();
assert!(receipt.is_valid);
assert_eq!(receipt.toolchain_version, env!("CARGO_PKG_VERSION"));
}
}