1use serde::{Deserialize, Serialize};
16use std::collections::HashMap;
17use std::path::PathBuf;
18use std::time::{Duration, Instant};
19
20use crate::pipeline::{PipelineContext, ValidationResult};
21use crate::types::Language;
22
23#[derive(Clone, Debug, Serialize, Deserialize)]
31pub struct PipelinePath {
32 pub stage_name: String,
34
35 pub duration_ns: u64,
37
38 pub success: bool,
40
41 pub error: Option<String>,
43
44 pub input_files: usize,
46
47 pub output_files: usize,
49
50 pub language: Option<Language>,
52
53 pub optimizations: Vec<String>,
55
56 pub validation: Option<ValidationResult>,
58
59 pub metadata: HashMap<String, serde_json::Value>,
61
62 contributions: Vec<f32>,
64
65 confidence: f32,
67}
68
69impl PipelinePath {
70 pub fn new(stage_name: impl Into<String>) -> Self {
72 Self {
73 stage_name: stage_name.into(),
74 duration_ns: 0,
75 success: true,
76 error: None,
77 input_files: 0,
78 output_files: 0,
79 language: None,
80 optimizations: Vec::new(),
81 validation: None,
82 metadata: HashMap::new(),
83 contributions: Vec::new(),
84 confidence: 1.0,
85 }
86 }
87
88 pub fn with_duration(mut self, duration: Duration) -> Self {
90 self.duration_ns = duration.as_nanos() as u64;
91 self
92 }
93
94 pub fn with_error(mut self, error: impl Into<String>) -> Self {
96 self.success = false;
97 self.error = Some(error.into());
98 self.confidence = 0.0;
99 self
100 }
101
102 pub fn with_file_counts(mut self, input: usize, output: usize) -> Self {
104 self.input_files = input;
105 self.output_files = output;
106 self
107 }
108
109 pub fn with_language(mut self, lang: Language) -> Self {
111 self.language = Some(lang);
112 self
113 }
114
115 pub fn with_optimizations(mut self, opts: Vec<String>) -> Self {
117 self.optimizations = opts;
118 self
119 }
120
121 pub fn with_validation(mut self, validation: ValidationResult) -> Self {
123 if !validation.passed {
124 self.confidence *= 0.5;
125 }
126 self.validation = Some(validation);
127 self
128 }
129
130 pub fn with_metadata(mut self, key: impl Into<String>, value: serde_json::Value) -> Self {
132 self.metadata.insert(key.into(), value);
133 self
134 }
135
136 pub fn with_contributions(mut self, contributions: Vec<f32>) -> Self {
138 self.contributions = contributions;
139 self
140 }
141
142 pub fn feature_contributions(&self) -> &[f32] {
144 &self.contributions
145 }
146
147 pub fn confidence(&self) -> f32 {
149 self.confidence
150 }
151
152 pub fn to_bytes(&self) -> Vec<u8> {
154 let mut bytes = Vec::new();
156
157 bytes.extend_from_slice(self.stage_name.as_bytes());
159 bytes.push(0);
160
161 bytes.extend_from_slice(&self.duration_ns.to_le_bytes());
163
164 bytes.push(u8::from(self.success));
166
167 if let Some(ref error) = self.error {
169 bytes.extend_from_slice(error.as_bytes());
170 }
171 bytes.push(0);
172
173 bytes.extend_from_slice(&(self.input_files as u64).to_le_bytes());
175 bytes.extend_from_slice(&(self.output_files as u64).to_le_bytes());
176
177 bytes.extend_from_slice(&self.confidence.to_le_bytes());
179
180 bytes
181 }
182
183 pub fn explain(&self) -> String {
185 let mut explanation = format!("Stage: {}\n", self.stage_name);
186 explanation
187 .push_str(&format!("Duration: {:.2}ms\n", self.duration_ns as f64 / 1_000_000.0));
188 explanation.push_str(&format!("Success: {}\n", self.success));
189
190 if let Some(ref error) = self.error {
191 explanation.push_str(&format!("Error: {}\n", error));
192 }
193
194 explanation.push_str(&format!(
195 "Files: {} input → {} output\n",
196 self.input_files, self.output_files
197 ));
198
199 if let Some(ref lang) = self.language {
200 explanation.push_str(&format!("Language: {:?}\n", lang));
201 }
202
203 if !self.optimizations.is_empty() {
204 explanation.push_str(&format!("Optimizations: {}\n", self.optimizations.join(", ")));
205 }
206
207 explanation.push_str(&format!("Confidence: {:.1}%", self.confidence * 100.0));
208
209 explanation
210 }
211}
212
213#[derive(Clone, Debug, Serialize, Deserialize)]
219pub struct PipelineTrace {
220 pub sequence: u64,
222
223 pub timestamp_ns: u64,
225
226 pub path: PipelinePath,
228
229 pub context_snapshot: Option<ContextSnapshot>,
231}
232
233#[derive(Clone, Debug, Serialize, Deserialize)]
235pub struct ContextSnapshot {
236 pub input_path: PathBuf,
238
239 pub output_path: PathBuf,
241
242 pub language: Option<Language>,
244
245 pub file_mapping_count: usize,
247
248 pub metadata_keys: Vec<String>,
250}
251
252impl From<&PipelineContext> for ContextSnapshot {
253 fn from(ctx: &PipelineContext) -> Self {
254 Self {
255 input_path: ctx.input_path.clone(),
256 output_path: ctx.output_path.clone(),
257 language: ctx.primary_language.clone(),
258 file_mapping_count: ctx.file_mappings.len(),
259 metadata_keys: ctx.metadata.keys().cloned().collect(),
260 }
261 }
262}
263
264#[derive(Clone, Debug, Serialize, Deserialize)]
270pub struct HashChainEntry {
271 pub sequence: u64,
273
274 pub prev_hash: [u8; 32],
276
277 pub hash: [u8; 32],
279
280 pub trace: PipelineTrace,
282}
283
284#[derive(Debug)]
292pub struct PipelineAuditCollector {
293 entries: Vec<HashChainEntry>,
295
296 next_sequence: u64,
298
299 run_id: String,
301
302 capture_snapshots: bool,
304}
305
306impl PipelineAuditCollector {
307 pub fn new(run_id: impl Into<String>) -> Self {
309 Self {
310 entries: Vec::new(),
311 next_sequence: 0,
312 run_id: run_id.into(),
313 capture_snapshots: true,
314 }
315 }
316
317 pub fn without_snapshots(mut self) -> Self {
319 self.capture_snapshots = false;
320 self
321 }
322
323 pub fn run_id(&self) -> &str {
325 &self.run_id
326 }
327
328 pub fn record_stage(
330 &mut self,
331 path: PipelinePath,
332 context: Option<&PipelineContext>,
333 ) -> &HashChainEntry {
334 let timestamp_ns = std::time::SystemTime::now()
335 .duration_since(std::time::UNIX_EPOCH)
336 .map(|d| d.as_nanos() as u64)
337 .unwrap_or(0);
338
339 let context_snapshot =
340 if self.capture_snapshots { context.map(ContextSnapshot::from) } else { None };
341
342 let trace =
343 PipelineTrace { sequence: self.next_sequence, timestamp_ns, path, context_snapshot };
344
345 let prev_hash = self.entries.last().map(|e| e.hash).unwrap_or([0u8; 32]);
347
348 let hash = self.compute_hash(&trace, &prev_hash);
350
351 let entry = HashChainEntry { sequence: self.next_sequence, prev_hash, hash, trace };
352
353 self.entries.push(entry);
354 self.next_sequence += 1;
355
356 self.entries.last().expect("just pushed")
357 }
358
359 fn compute_hash(&self, trace: &PipelineTrace, prev_hash: &[u8; 32]) -> [u8; 32] {
361 use std::collections::hash_map::DefaultHasher;
362 use std::hash::{Hash, Hasher};
363
364 let mut hasher = DefaultHasher::new();
366
367 prev_hash.hash(&mut hasher);
369
370 trace.sequence.hash(&mut hasher);
372 trace.timestamp_ns.hash(&mut hasher);
373 trace.path.stage_name.hash(&mut hasher);
374 trace.path.duration_ns.hash(&mut hasher);
375 trace.path.success.hash(&mut hasher);
376
377 let hash_value = hasher.finish();
378
379 let mut result = [0u8; 32];
381 for i in 0..4 {
382 result[i * 8..(i + 1) * 8].copy_from_slice(&hash_value.to_le_bytes());
383 }
384
385 result
386 }
387
388 pub fn entries(&self) -> &[HashChainEntry] {
390 &self.entries
391 }
392
393 pub fn len(&self) -> usize {
395 self.entries.len()
396 }
397
398 pub fn is_empty(&self) -> bool {
400 self.entries.is_empty()
401 }
402
403 pub fn verify_chain(&self) -> ChainVerification {
405 let mut entries_verified = 0;
406
407 for (i, entry) in self.entries.iter().enumerate() {
408 if i == 0 {
410 if entry.prev_hash != [0u8; 32] {
411 return ChainVerification::invalid_at(entries_verified, 0);
412 }
413 } else {
414 let expected_prev = self.entries[i - 1].hash;
415 if entry.prev_hash != expected_prev {
416 return ChainVerification::invalid_at(entries_verified, i);
417 }
418 }
419
420 let computed_hash = self.compute_hash(&entry.trace, &entry.prev_hash);
422 if entry.hash != computed_hash {
423 return ChainVerification::invalid_at(entries_verified, i);
424 }
425
426 entries_verified += 1;
427 }
428
429 ChainVerification::valid(entries_verified)
430 }
431
432 pub fn recent(&self, n: usize) -> Vec<&HashChainEntry> {
434 self.entries.iter().rev().take(n).collect()
435 }
436
437 pub fn to_json(&self) -> Result<String, serde_json::Error> {
439 #[derive(Serialize)]
440 struct Export<'a> {
441 run_id: &'a str,
442 chain_length: usize,
443 verified: bool,
444 entries: &'a [HashChainEntry],
445 }
446
447 let verification = self.verify_chain();
448
449 let export = Export {
450 run_id: &self.run_id,
451 chain_length: self.entries.len(),
452 verified: verification.valid,
453 entries: &self.entries,
454 };
455
456 serde_json::to_string_pretty(&export)
457 }
458}
459
460#[derive(Clone, Debug, Serialize, Deserialize)]
462pub struct ChainVerification {
463 pub valid: bool,
465
466 pub entries_verified: usize,
468
469 pub first_break: Option<usize>,
471}
472
473impl ChainVerification {
474 fn valid(entries_verified: usize) -> Self {
475 Self { valid: true, entries_verified, first_break: None }
476 }
477 fn invalid_at(entries_verified: usize, index: usize) -> Self {
478 Self { valid: false, entries_verified, first_break: Some(index) }
479 }
480}
481
482pub struct StageTimer {
488 start: Instant,
489 stage_name: String,
490}
491
492impl StageTimer {
493 pub fn start(stage_name: impl Into<String>) -> Self {
495 Self { start: crate::timing::start_timer(), stage_name: stage_name.into() }
496 }
497
498 pub fn stop(self) -> PipelinePath {
500 let duration = self.start.elapsed();
501 PipelinePath::new(self.stage_name).with_duration(duration)
502 }
503
504 pub fn stop_with_error(self, error: impl Into<String>) -> PipelinePath {
506 let duration = self.start.elapsed();
507 PipelinePath::new(self.stage_name).with_duration(duration).with_error(error)
508 }
509}
510
511pub fn new_audit_collector() -> PipelineAuditCollector {
517 let run_id = format!(
518 "run-{}",
519 std::time::SystemTime::now()
520 .duration_since(std::time::UNIX_EPOCH)
521 .map(|d| d.as_millis())
522 .unwrap_or(0)
523 );
524 PipelineAuditCollector::new(run_id)
525}
526
527pub fn record_success<'a>(
529 collector: &'a mut PipelineAuditCollector,
530 stage_name: &str,
531 duration: Duration,
532 context: Option<&PipelineContext>,
533) -> &'a HashChainEntry {
534 let path = PipelinePath::new(stage_name).with_duration(duration);
535 collector.record_stage(path, context)
536}
537
538pub fn record_failure<'a>(
540 collector: &'a mut PipelineAuditCollector,
541 stage_name: &str,
542 duration: Duration,
543 error: &str,
544 context: Option<&PipelineContext>,
545) -> &'a HashChainEntry {
546 let path = PipelinePath::new(stage_name).with_duration(duration).with_error(error);
547 collector.record_stage(path, context)
548}
549
550#[cfg(test)]
555#[allow(clippy::unwrap_used)]
556mod tests {
557 use super::*;
558
559 fn new_collector_with_stages(run_id: &str, stages: &[&str]) -> PipelineAuditCollector {
560 let mut collector = PipelineAuditCollector::new(run_id);
561 for stage in stages {
562 collector.record_stage(PipelinePath::new(*stage), None);
563 }
564 collector
565 }
566
567 pub(crate) fn make_validation(stage: &str, passed: bool) -> ValidationResult {
568 ValidationResult {
569 stage: stage.to_string(),
570 passed,
571 message: if passed { "OK" } else { "Failed" }.to_string(),
572 details: None,
573 }
574 }
575
576 #[test]
577 fn test_pipeline_path_creation() {
578 let path = PipelinePath::new("Analysis");
579 assert_eq!(path.stage_name, "Analysis");
580 assert!(path.success);
581 assert_eq!(path.confidence(), 1.0);
582 }
583
584 #[test]
585 fn test_pipeline_path_with_duration() {
586 let path = PipelinePath::new("Build").with_duration(Duration::from_millis(100));
587 assert_eq!(path.duration_ns, 100_000_000);
588 }
589
590 #[test]
591 fn test_pipeline_path_with_error() {
592 let path = PipelinePath::new("Compile").with_error("Syntax error");
593 assert!(!path.success);
594 assert_eq!(path.error, Some("Syntax error".to_string()));
595 assert_eq!(path.confidence(), 0.0);
596 }
597
598 #[test]
599 fn test_pipeline_path_with_file_counts() {
600 let path = PipelinePath::new("Transform").with_file_counts(10, 5);
601 assert_eq!(path.input_files, 10);
602 assert_eq!(path.output_files, 5);
603 }
604
605 #[test]
606 fn test_pipeline_path_with_language() {
607 let path = PipelinePath::new("Detect").with_language(Language::Python);
608 assert_eq!(path.language, Some(Language::Python));
609 }
610
611 #[test]
612 fn test_pipeline_path_with_optimizations() {
613 let path =
614 PipelinePath::new("Optimize").with_optimizations(vec!["SIMD".into(), "GPU".into()]);
615 assert_eq!(path.optimizations.len(), 2);
616 }
617
618 #[test]
619 fn test_pipeline_path_explain() {
620 let path = PipelinePath::new("Test")
621 .with_duration(Duration::from_millis(50))
622 .with_file_counts(3, 2);
623 let explanation = path.explain();
624 assert!(explanation.contains("Test"));
625 assert!(explanation.contains("50.00ms"));
626 assert!(explanation.contains("3 input → 2 output"));
627 }
628
629 #[test]
630 fn test_pipeline_path_to_bytes() {
631 let path = PipelinePath::new("Stage");
632 let bytes = path.to_bytes();
633 assert!(!bytes.is_empty());
634 }
635
636 #[test]
637 fn test_audit_collector_creation() {
638 let collector = PipelineAuditCollector::new("test-run");
639 assert_eq!(collector.run_id(), "test-run");
640 assert!(collector.is_empty());
641 }
642
643 #[test]
644 fn test_audit_collector_record_stage() {
645 let mut collector = PipelineAuditCollector::new("test");
646 let path = PipelinePath::new("Stage1");
647
648 let entry = collector.record_stage(path, None);
649
650 assert_eq!(entry.sequence, 0);
651 assert_eq!(entry.prev_hash, [0u8; 32]);
652 assert_eq!(collector.len(), 1);
653 }
654
655 #[test]
656 fn test_audit_collector_hash_chain_linkage() {
657 let collector = new_collector_with_stages("test", &["Stage1", "Stage2", "Stage3"]);
658
659 let entries = collector.entries();
660
661 assert_eq!(entries[0].prev_hash, [0u8; 32]);
663
664 assert_eq!(entries[1].prev_hash, entries[0].hash);
666 assert_eq!(entries[2].prev_hash, entries[1].hash);
667 }
668
669 #[test]
670 fn test_audit_collector_verify_chain_valid() {
671 let collector = new_collector_with_stages("test", &["Stage1", "Stage2"]);
672
673 let verification = collector.verify_chain();
674 assert!(verification.valid);
675 assert_eq!(verification.entries_verified, 2);
676 assert!(verification.first_break.is_none());
677 }
678
679 #[test]
680 fn test_audit_collector_recent() {
681 let mut collector = PipelineAuditCollector::new("test");
682
683 for i in 0..5 {
684 collector.record_stage(PipelinePath::new(format!("Stage{}", i)), None);
685 }
686
687 let recent = collector.recent(3);
688 assert_eq!(recent.len(), 3);
689 assert_eq!(recent[0].sequence, 4); assert_eq!(recent[1].sequence, 3);
691 assert_eq!(recent[2].sequence, 2);
692 }
693
694 #[test]
695 fn test_audit_collector_to_json() {
696 let mut collector = PipelineAuditCollector::new("test");
697 collector.record_stage(PipelinePath::new("Stage1"), None);
698
699 let json = collector.to_json().expect("unexpected failure");
700 assert!(json.contains("test"));
701 assert!(json.contains("Stage1"));
702 assert!(json.contains("verified"));
703 }
704
705 #[test]
706 fn test_stage_timer() {
707 let timer = StageTimer::start("Test");
708 let path = timer.stop();
709
710 assert_eq!(path.stage_name, "Test");
711 assert!(path.success);
713 }
714
715 #[test]
716 fn test_stage_timer_with_error() {
717 let timer = StageTimer::start("Test");
718 let path = timer.stop_with_error("Failed");
719
720 assert!(!path.success);
721 assert_eq!(path.error, Some("Failed".to_string()));
722 }
723
724 #[test]
725 fn test_new_audit_collector() {
726 let collector = new_audit_collector();
727 assert!(collector.run_id().starts_with("run-"));
728 }
729
730 #[test]
731 fn test_record_success() {
732 let mut collector = new_audit_collector();
733 let entry = record_success(&mut collector, "Stage", Duration::from_millis(100), None);
734
735 assert!(entry.trace.path.success);
736 assert_eq!(entry.trace.path.stage_name, "Stage");
737 }
738
739 #[test]
740 fn test_record_failure() {
741 let mut collector = new_audit_collector();
742 let entry = record_failure(
743 &mut collector,
744 "Stage",
745 Duration::from_millis(50),
746 "Error message",
747 None,
748 );
749
750 assert!(!entry.trace.path.success);
751 assert_eq!(entry.trace.path.error, Some("Error message".to_string()));
752 }
753
754 #[test]
755 fn test_context_snapshot() {
756 let ctx = PipelineContext::new(
757 std::path::PathBuf::from("/input"),
758 std::path::PathBuf::from("/output"),
759 );
760 let snapshot = ContextSnapshot::from(&ctx);
761
762 assert_eq!(snapshot.input_path, std::path::PathBuf::from("/input"));
763 assert_eq!(snapshot.output_path, std::path::PathBuf::from("/output"));
764 }
765
766 #[test]
767 fn test_collector_without_snapshots() {
768 let mut collector = PipelineAuditCollector::new("test").without_snapshots();
769
770 let ctx = PipelineContext::new(
771 std::path::PathBuf::from("/input"),
772 std::path::PathBuf::from("/output"),
773 );
774
775 collector.record_stage(PipelinePath::new("Stage"), Some(&ctx));
776
777 assert!(collector.entries()[0].trace.context_snapshot.is_none());
779 }
780
781 #[test]
782 fn test_pipeline_path_with_validation_passed() {
783 let path = PipelinePath::new("Stage").with_validation(make_validation("Test", true));
784 assert_eq!(path.confidence(), 1.0); }
786
787 #[test]
788 fn test_pipeline_path_with_validation_failed() {
789 let path = PipelinePath::new("Stage").with_validation(make_validation("Test", false));
790 assert_eq!(path.confidence(), 0.5); }
792
793 #[test]
794 fn test_pipeline_path_with_metadata() {
795 let path = PipelinePath::new("Stage")
796 .with_metadata("key1", serde_json::json!("value1"))
797 .with_metadata("key2", serde_json::json!(42));
798
799 assert_eq!(path.metadata.len(), 2);
800 assert_eq!(path.metadata.get("key1"), Some(&serde_json::json!("value1")));
801 assert_eq!(path.metadata.get("key2"), Some(&serde_json::json!(42)));
802 }
803
804 #[test]
805 fn test_pipeline_path_with_contributions() {
806 let contributions = vec![0.1, -0.2, 0.3];
807 let path = PipelinePath::new("Stage").with_contributions(contributions.clone());
808
809 assert_eq!(path.feature_contributions(), &contributions);
810 }
811
812 #[test]
813 fn test_chain_verification_serialization() {
814 let verification = ChainVerification::valid(5);
815
816 let json = serde_json::to_string(&verification).expect("json serialize failed");
817 let deserialized: ChainVerification =
818 serde_json::from_str(&json).expect("json deserialize failed");
819
820 assert_eq!(verification.valid, deserialized.valid);
821 assert_eq!(verification.entries_verified, deserialized.entries_verified);
822 }
823}
824
825#[cfg(test)]
826mod proptests {
827 use super::tests::make_validation;
828 use super::*;
829 use proptest::prelude::*;
830
831 proptest! {
832 #![proptest_config(ProptestConfig::with_cases(100))]
833
834 #[test]
835 fn prop_hash_chain_always_valid(n in 1usize..20) {
836 let mut collector = PipelineAuditCollector::new("prop-test");
837
838 for i in 0..n {
839 collector.record_stage(PipelinePath::new(format!("Stage{}", i)), None);
840 }
841
842 let verification = collector.verify_chain();
843 prop_assert!(verification.valid);
844 prop_assert_eq!(verification.entries_verified, n);
845 }
846
847 #[test]
848 fn prop_sequence_numbers_monotonic(n in 2usize..20) {
849 let mut collector = PipelineAuditCollector::new("prop-test");
850
851 for i in 0..n {
852 collector.record_stage(PipelinePath::new(format!("Stage{}", i)), None);
853 }
854
855 let entries = collector.entries();
856 for i in 1..entries.len() {
857 prop_assert!(entries[i].sequence > entries[i-1].sequence);
858 }
859 }
860
861 #[test]
862 fn prop_path_confidence_bounded(
863 success in any::<bool>(),
864 validation_passed in any::<bool>()
865 ) {
866 let mut path = PipelinePath::new("Test");
867
868 if !success {
869 path = path.with_error("Error");
870 }
871
872 path = path.with_validation(make_validation("Test", validation_passed));
873
874 let confidence = path.confidence();
875 prop_assert!(confidence >= 0.0);
876 prop_assert!(confidence <= 1.0);
877 }
878
879 #[test]
880 fn prop_to_bytes_deterministic(stage_name in "[a-z]{1,20}") {
881 let path1 = PipelinePath::new(&stage_name);
882 let path2 = PipelinePath::new(&stage_name);
883
884 let bytes1 = path1.to_bytes();
885 let bytes2 = path2.to_bytes();
886
887 prop_assert_eq!(bytes1, bytes2);
888 }
889
890 #[test]
891 fn prop_recent_count_correct(n in 1usize..50, take in 1usize..20) {
892 let mut collector = PipelineAuditCollector::new("test");
893
894 for i in 0..n {
895 collector.record_stage(PipelinePath::new(format!("S{}", i)), None);
896 }
897
898 let recent = collector.recent(take);
899 let expected = take.min(n);
900 prop_assert_eq!(recent.len(), expected);
901 }
902 }
903}