use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use crate::pipeline::{PipelineContext, ValidationResult};
use crate::types::Language;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PipelinePath {
pub stage_name: String,
pub duration_ns: u64,
pub success: bool,
pub error: Option<String>,
pub input_files: usize,
pub output_files: usize,
pub language: Option<Language>,
pub optimizations: Vec<String>,
pub validation: Option<ValidationResult>,
pub metadata: HashMap<String, serde_json::Value>,
contributions: Vec<f32>,
confidence: f32,
}
impl PipelinePath {
pub fn new(stage_name: impl Into<String>) -> Self {
Self {
stage_name: stage_name.into(),
duration_ns: 0,
success: true,
error: None,
input_files: 0,
output_files: 0,
language: None,
optimizations: Vec::new(),
validation: None,
metadata: HashMap::new(),
contributions: Vec::new(),
confidence: 1.0,
}
}
pub fn with_duration(mut self, duration: Duration) -> Self {
self.duration_ns = duration.as_nanos() as u64;
self
}
pub fn with_error(mut self, error: impl Into<String>) -> Self {
self.success = false;
self.error = Some(error.into());
self.confidence = 0.0;
self
}
pub fn with_file_counts(mut self, input: usize, output: usize) -> Self {
self.input_files = input;
self.output_files = output;
self
}
pub fn with_language(mut self, lang: Language) -> Self {
self.language = Some(lang);
self
}
pub fn with_optimizations(mut self, opts: Vec<String>) -> Self {
self.optimizations = opts;
self
}
pub fn with_validation(mut self, validation: ValidationResult) -> Self {
if !validation.passed {
self.confidence *= 0.5;
}
self.validation = Some(validation);
self
}
pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
self.metadata.insert(key.into(), value);
self
}
pub fn with_contributions(mut self, contributions: Vec<f32>) -> Self {
self.contributions = contributions;
self
}
pub fn feature_contributions(&self) -> &[f32] {
&self.contributions
}
pub fn confidence(&self) -> f32 {
self.confidence
}
pub fn to_bytes(&self) -> Vec<u8> {
let mut bytes = Vec::new();
bytes.extend_from_slice(self.stage_name.as_bytes());
bytes.push(0);
bytes.extend_from_slice(&self.duration_ns.to_le_bytes());
bytes.push(u8::from(self.success));
if let Some(ref error) = self.error {
bytes.extend_from_slice(error.as_bytes());
}
bytes.push(0);
bytes.extend_from_slice(&(self.input_files as u64).to_le_bytes());
bytes.extend_from_slice(&(self.output_files as u64).to_le_bytes());
bytes.extend_from_slice(&self.confidence.to_le_bytes());
bytes
}
pub fn explain(&self) -> String {
let mut explanation = format!("Stage: {}\n", self.stage_name);
explanation
.push_str(&format!("Duration: {:.2}ms\n", self.duration_ns as f64 / 1_000_000.0));
explanation.push_str(&format!("Success: {}\n", self.success));
if let Some(ref error) = self.error {
explanation.push_str(&format!("Error: {}\n", error));
}
explanation.push_str(&format!(
"Files: {} input → {} output\n",
self.input_files, self.output_files
));
if let Some(ref lang) = self.language {
explanation.push_str(&format!("Language: {:?}\n", lang));
}
if !self.optimizations.is_empty() {
explanation.push_str(&format!("Optimizations: {}\n", self.optimizations.join(", ")));
}
explanation.push_str(&format!("Confidence: {:.1}%", self.confidence * 100.0));
explanation
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct PipelineTrace {
pub sequence: u64,
pub timestamp_ns: u64,
pub path: PipelinePath,
pub context_snapshot: Option<ContextSnapshot>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ContextSnapshot {
pub input_path: PathBuf,
pub output_path: PathBuf,
pub language: Option<Language>,
pub file_mapping_count: usize,
pub metadata_keys: Vec<String>,
}
impl From<&PipelineContext> for ContextSnapshot {
fn from(ctx: &PipelineContext) -> Self {
Self {
input_path: ctx.input_path.clone(),
output_path: ctx.output_path.clone(),
language: ctx.primary_language.clone(),
file_mapping_count: ctx.file_mappings.len(),
metadata_keys: ctx.metadata.keys().cloned().collect(),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct HashChainEntry {
pub sequence: u64,
pub prev_hash: [u8; 32],
pub hash: [u8; 32],
pub trace: PipelineTrace,
}
#[derive(Debug)]
pub struct PipelineAuditCollector {
entries: Vec<HashChainEntry>,
next_sequence: u64,
run_id: String,
capture_snapshots: bool,
}
impl PipelineAuditCollector {
pub fn new(run_id: impl Into<String>) -> Self {
Self {
entries: Vec::new(),
next_sequence: 0,
run_id: run_id.into(),
capture_snapshots: true,
}
}
pub fn without_snapshots(mut self) -> Self {
self.capture_snapshots = false;
self
}
pub fn run_id(&self) -> &str {
&self.run_id
}
pub fn record_stage(
&mut self,
path: PipelinePath,
context: Option<&PipelineContext>,
) -> &HashChainEntry {
let timestamp_ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as u64)
.unwrap_or(0);
let context_snapshot =
if self.capture_snapshots { context.map(ContextSnapshot::from) } else { None };
let trace =
PipelineTrace { sequence: self.next_sequence, timestamp_ns, path, context_snapshot };
let prev_hash = self.entries.last().map(|e| e.hash).unwrap_or([0u8; 32]);
let hash = self.compute_hash(&trace, &prev_hash);
let entry = HashChainEntry { sequence: self.next_sequence, prev_hash, hash, trace };
self.entries.push(entry);
self.next_sequence += 1;
self.entries.last().expect("just pushed")
}
fn compute_hash(&self, trace: &PipelineTrace, prev_hash: &[u8; 32]) -> [u8; 32] {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
prev_hash.hash(&mut hasher);
trace.sequence.hash(&mut hasher);
trace.timestamp_ns.hash(&mut hasher);
trace.path.stage_name.hash(&mut hasher);
trace.path.duration_ns.hash(&mut hasher);
trace.path.success.hash(&mut hasher);
let hash_value = hasher.finish();
let mut result = [0u8; 32];
for i in 0..4 {
result[i * 8..(i + 1) * 8].copy_from_slice(&hash_value.to_le_bytes());
}
result
}
pub fn entries(&self) -> &[HashChainEntry] {
&self.entries
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn verify_chain(&self) -> ChainVerification {
let mut entries_verified = 0;
for (i, entry) in self.entries.iter().enumerate() {
if i == 0 {
if entry.prev_hash != [0u8; 32] {
return ChainVerification::invalid_at(entries_verified, 0);
}
} else {
let expected_prev = self.entries[i - 1].hash;
if entry.prev_hash != expected_prev {
return ChainVerification::invalid_at(entries_verified, i);
}
}
let computed_hash = self.compute_hash(&entry.trace, &entry.prev_hash);
if entry.hash != computed_hash {
return ChainVerification::invalid_at(entries_verified, i);
}
entries_verified += 1;
}
ChainVerification::valid(entries_verified)
}
pub fn recent(&self, n: usize) -> Vec<&HashChainEntry> {
self.entries.iter().rev().take(n).collect()
}
pub fn to_json(&self) -> Result<String, serde_json::Error> {
#[derive(Serialize)]
struct Export<'a> {
run_id: &'a str,
chain_length: usize,
verified: bool,
entries: &'a [HashChainEntry],
}
let verification = self.verify_chain();
let export = Export {
run_id: &self.run_id,
chain_length: self.entries.len(),
verified: verification.valid,
entries: &self.entries,
};
serde_json::to_string_pretty(&export)
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct ChainVerification {
pub valid: bool,
pub entries_verified: usize,
pub first_break: Option<usize>,
}
impl ChainVerification {
fn valid(entries_verified: usize) -> Self {
Self { valid: true, entries_verified, first_break: None }
}
fn invalid_at(entries_verified: usize, index: usize) -> Self {
Self { valid: false, entries_verified, first_break: Some(index) }
}
}
pub struct StageTimer {
start: Instant,
stage_name: String,
}
impl StageTimer {
pub fn start(stage_name: impl Into<String>) -> Self {
Self { start: crate::timing::start_timer(), stage_name: stage_name.into() }
}
pub fn stop(self) -> PipelinePath {
let duration = self.start.elapsed();
PipelinePath::new(self.stage_name).with_duration(duration)
}
pub fn stop_with_error(self, error: impl Into<String>) -> PipelinePath {
let duration = self.start.elapsed();
PipelinePath::new(self.stage_name).with_duration(duration).with_error(error)
}
}
pub fn new_audit_collector() -> PipelineAuditCollector {
let run_id = format!(
"run-{}",
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis())
.unwrap_or(0)
);
PipelineAuditCollector::new(run_id)
}
pub fn record_success<'a>(
collector: &'a mut PipelineAuditCollector,
stage_name: &str,
duration: Duration,
context: Option<&PipelineContext>,
) -> &'a HashChainEntry {
let path = PipelinePath::new(stage_name).with_duration(duration);
collector.record_stage(path, context)
}
pub fn record_failure<'a>(
collector: &'a mut PipelineAuditCollector,
stage_name: &str,
duration: Duration,
error: &str,
context: Option<&PipelineContext>,
) -> &'a HashChainEntry {
let path = PipelinePath::new(stage_name).with_duration(duration).with_error(error);
collector.record_stage(path, context)
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
fn new_collector_with_stages(run_id: &str, stages: &[&str]) -> PipelineAuditCollector {
let mut collector = PipelineAuditCollector::new(run_id);
for stage in stages {
collector.record_stage(PipelinePath::new(*stage), None);
}
collector
}
pub(crate) fn make_validation(stage: &str, passed: bool) -> ValidationResult {
ValidationResult {
stage: stage.to_string(),
passed,
message: if passed { "OK" } else { "Failed" }.to_string(),
details: None,
}
}
#[test]
fn test_pipeline_path_creation() {
let path = PipelinePath::new("Analysis");
assert_eq!(path.stage_name, "Analysis");
assert!(path.success);
assert_eq!(path.confidence(), 1.0);
}
#[test]
fn test_pipeline_path_with_duration() {
let path = PipelinePath::new("Build").with_duration(Duration::from_millis(100));
assert_eq!(path.duration_ns, 100_000_000);
}
#[test]
fn test_pipeline_path_with_error() {
let path = PipelinePath::new("Compile").with_error("Syntax error");
assert!(!path.success);
assert_eq!(path.error, Some("Syntax error".to_string()));
assert_eq!(path.confidence(), 0.0);
}
#[test]
fn test_pipeline_path_with_file_counts() {
let path = PipelinePath::new("Transform").with_file_counts(10, 5);
assert_eq!(path.input_files, 10);
assert_eq!(path.output_files, 5);
}
#[test]
fn test_pipeline_path_with_language() {
let path = PipelinePath::new("Detect").with_language(Language::Python);
assert_eq!(path.language, Some(Language::Python));
}
#[test]
fn test_pipeline_path_with_optimizations() {
let path =
PipelinePath::new("Optimize").with_optimizations(vec!["SIMD".into(), "GPU".into()]);
assert_eq!(path.optimizations.len(), 2);
}
#[test]
fn test_pipeline_path_explain() {
let path = PipelinePath::new("Test")
.with_duration(Duration::from_millis(50))
.with_file_counts(3, 2);
let explanation = path.explain();
assert!(explanation.contains("Test"));
assert!(explanation.contains("50.00ms"));
assert!(explanation.contains("3 input → 2 output"));
}
#[test]
fn test_pipeline_path_to_bytes() {
let path = PipelinePath::new("Stage");
let bytes = path.to_bytes();
assert!(!bytes.is_empty());
}
#[test]
fn test_audit_collector_creation() {
let collector = PipelineAuditCollector::new("test-run");
assert_eq!(collector.run_id(), "test-run");
assert!(collector.is_empty());
}
#[test]
fn test_audit_collector_record_stage() {
let mut collector = PipelineAuditCollector::new("test");
let path = PipelinePath::new("Stage1");
let entry = collector.record_stage(path, None);
assert_eq!(entry.sequence, 0);
assert_eq!(entry.prev_hash, [0u8; 32]);
assert_eq!(collector.len(), 1);
}
#[test]
fn test_audit_collector_hash_chain_linkage() {
let collector = new_collector_with_stages("test", &["Stage1", "Stage2", "Stage3"]);
let entries = collector.entries();
assert_eq!(entries[0].prev_hash, [0u8; 32]);
assert_eq!(entries[1].prev_hash, entries[0].hash);
assert_eq!(entries[2].prev_hash, entries[1].hash);
}
#[test]
fn test_audit_collector_verify_chain_valid() {
let collector = new_collector_with_stages("test", &["Stage1", "Stage2"]);
let verification = collector.verify_chain();
assert!(verification.valid);
assert_eq!(verification.entries_verified, 2);
assert!(verification.first_break.is_none());
}
#[test]
fn test_audit_collector_recent() {
let mut collector = PipelineAuditCollector::new("test");
for i in 0..5 {
collector.record_stage(PipelinePath::new(format!("Stage{}", i)), None);
}
let recent = collector.recent(3);
assert_eq!(recent.len(), 3);
assert_eq!(recent[0].sequence, 4); assert_eq!(recent[1].sequence, 3);
assert_eq!(recent[2].sequence, 2);
}
#[test]
fn test_audit_collector_to_json() {
let mut collector = PipelineAuditCollector::new("test");
collector.record_stage(PipelinePath::new("Stage1"), None);
let json = collector.to_json().expect("unexpected failure");
assert!(json.contains("test"));
assert!(json.contains("Stage1"));
assert!(json.contains("verified"));
}
#[test]
fn test_stage_timer() {
let timer = StageTimer::start("Test");
let path = timer.stop();
assert_eq!(path.stage_name, "Test");
assert!(path.success);
}
#[test]
fn test_stage_timer_with_error() {
let timer = StageTimer::start("Test");
let path = timer.stop_with_error("Failed");
assert!(!path.success);
assert_eq!(path.error, Some("Failed".to_string()));
}
#[test]
fn test_new_audit_collector() {
let collector = new_audit_collector();
assert!(collector.run_id().starts_with("run-"));
}
#[test]
fn test_record_success() {
let mut collector = new_audit_collector();
let entry = record_success(&mut collector, "Stage", Duration::from_millis(100), None);
assert!(entry.trace.path.success);
assert_eq!(entry.trace.path.stage_name, "Stage");
}
#[test]
fn test_record_failure() {
let mut collector = new_audit_collector();
let entry = record_failure(
&mut collector,
"Stage",
Duration::from_millis(50),
"Error message",
None,
);
assert!(!entry.trace.path.success);
assert_eq!(entry.trace.path.error, Some("Error message".to_string()));
}
#[test]
fn test_context_snapshot() {
let ctx = PipelineContext::new(
std::path::PathBuf::from("/input"),
std::path::PathBuf::from("/output"),
);
let snapshot = ContextSnapshot::from(&ctx);
assert_eq!(snapshot.input_path, std::path::PathBuf::from("/input"));
assert_eq!(snapshot.output_path, std::path::PathBuf::from("/output"));
}
#[test]
fn test_collector_without_snapshots() {
let mut collector = PipelineAuditCollector::new("test").without_snapshots();
let ctx = PipelineContext::new(
std::path::PathBuf::from("/input"),
std::path::PathBuf::from("/output"),
);
collector.record_stage(PipelinePath::new("Stage"), Some(&ctx));
assert!(collector.entries()[0].trace.context_snapshot.is_none());
}
#[test]
fn test_pipeline_path_with_validation_passed() {
let path = PipelinePath::new("Stage").with_validation(make_validation("Test", true));
assert_eq!(path.confidence(), 1.0); }
#[test]
fn test_pipeline_path_with_validation_failed() {
let path = PipelinePath::new("Stage").with_validation(make_validation("Test", false));
assert_eq!(path.confidence(), 0.5); }
#[test]
fn test_pipeline_path_with_metadata() {
let path = PipelinePath::new("Stage")
.with_metadata("key1", serde_json::json!("value1"))
.with_metadata("key2", serde_json::json!(42));
assert_eq!(path.metadata.len(), 2);
assert_eq!(path.metadata.get("key1"), Some(&serde_json::json!("value1")));
assert_eq!(path.metadata.get("key2"), Some(&serde_json::json!(42)));
}
#[test]
fn test_pipeline_path_with_contributions() {
let contributions = vec![0.1, -0.2, 0.3];
let path = PipelinePath::new("Stage").with_contributions(contributions.clone());
assert_eq!(path.feature_contributions(), &contributions);
}
#[test]
fn test_chain_verification_serialization() {
let verification = ChainVerification::valid(5);
let json = serde_json::to_string(&verification).expect("json serialize failed");
let deserialized: ChainVerification =
serde_json::from_str(&json).expect("json deserialize failed");
assert_eq!(verification.valid, deserialized.valid);
assert_eq!(verification.entries_verified, deserialized.entries_verified);
}
}
#[cfg(test)]
mod proptests {
use super::tests::make_validation;
use super::*;
use proptest::prelude::*;
proptest! {
#![proptest_config(ProptestConfig::with_cases(100))]
#[test]
fn prop_hash_chain_always_valid(n in 1usize..20) {
let mut collector = PipelineAuditCollector::new("prop-test");
for i in 0..n {
collector.record_stage(PipelinePath::new(format!("Stage{}", i)), None);
}
let verification = collector.verify_chain();
prop_assert!(verification.valid);
prop_assert_eq!(verification.entries_verified, n);
}
#[test]
fn prop_sequence_numbers_monotonic(n in 2usize..20) {
let mut collector = PipelineAuditCollector::new("prop-test");
for i in 0..n {
collector.record_stage(PipelinePath::new(format!("Stage{}", i)), None);
}
let entries = collector.entries();
for i in 1..entries.len() {
prop_assert!(entries[i].sequence > entries[i-1].sequence);
}
}
#[test]
fn prop_path_confidence_bounded(
success in any::<bool>(),
validation_passed in any::<bool>()
) {
let mut path = PipelinePath::new("Test");
if !success {
path = path.with_error("Error");
}
path = path.with_validation(make_validation("Test", validation_passed));
let confidence = path.confidence();
prop_assert!(confidence >= 0.0);
prop_assert!(confidence <= 1.0);
}
#[test]
fn prop_to_bytes_deterministic(stage_name in "[a-z]{1,20}") {
let path1 = PipelinePath::new(&stage_name);
let path2 = PipelinePath::new(&stage_name);
let bytes1 = path1.to_bytes();
let bytes2 = path2.to_bytes();
prop_assert_eq!(bytes1, bytes2);
}
#[test]
fn prop_recent_count_correct(n in 1usize..50, take in 1usize..20) {
let mut collector = PipelineAuditCollector::new("test");
for i in 0..n {
collector.record_stage(PipelinePath::new(format!("S{}", i)), None);
}
let recent = collector.recent(take);
let expected = take.min(n);
prop_assert_eq!(recent.len(), expected);
}
}
}