dataprof_core/
execution.rs1#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
3pub enum TruncationReason {
4 MaxRows(u64),
6 MaxBytes(u64),
8 MemoryPressure,
10 StopCondition(String),
12 StreamClosed,
14 Timeout,
16}
17
18#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
20pub struct ExecutionMetadata {
21 pub rows_processed: usize,
23 #[serde(skip_serializing_if = "Option::is_none")]
25 pub bytes_consumed: Option<u64>,
26 pub columns_detected: usize,
28 pub scan_time_ms: u128,
30 #[serde(skip_serializing_if = "Option::is_none")]
32 pub throughput_rows_sec: Option<f64>,
33 #[serde(skip_serializing_if = "Option::is_none")]
35 pub memory_peak_mb: Option<f64>,
36 pub error_count: usize,
38 pub source_exhausted: bool,
40 #[serde(skip_serializing_if = "Option::is_none")]
42 pub truncation_reason: Option<TruncationReason>,
43 pub sampling_applied: bool,
45 #[serde(skip_serializing_if = "Option::is_none")]
47 pub sampling_ratio: Option<f64>,
48}
49
50impl ExecutionMetadata {
51 pub fn new(rows_processed: usize, columns_detected: usize, scan_time_ms: u128) -> Self {
53 let throughput_rows_sec = if scan_time_ms > 0 {
54 Some(rows_processed as f64 / (scan_time_ms as f64 / 1000.0))
55 } else {
56 None
57 };
58
59 Self {
60 rows_processed,
61 bytes_consumed: None,
62 columns_detected,
63 scan_time_ms,
64 throughput_rows_sec,
65 memory_peak_mb: None,
66 error_count: 0,
67 source_exhausted: true,
68 truncation_reason: None,
69 sampling_applied: false,
70 sampling_ratio: None,
71 }
72 }
73
74 pub fn with_sampling(mut self, ratio: f64) -> Self {
76 self.sampling_applied = true;
77 self.sampling_ratio = Some(ratio);
78 self
79 }
80
81 pub fn with_source_exhausted(mut self, exhausted: bool) -> Self {
83 self.source_exhausted = exhausted;
84 self
85 }
86
87 pub fn with_truncation(mut self, reason: TruncationReason) -> Self {
89 self.source_exhausted = false;
90 self.truncation_reason = Some(reason);
91 self
92 }
93
94 pub fn with_bytes_consumed(mut self, bytes: u64) -> Self {
96 self.bytes_consumed = Some(bytes);
97 self
98 }
99
100 pub fn with_error_count(mut self, count: usize) -> Self {
102 self.error_count = count;
103 self
104 }
105
106 pub fn with_memory_peak_mb(mut self, mb: f64) -> Self {
108 self.memory_peak_mb = Some(mb);
109 self
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116
117 #[test]
118 fn test_execution_metadata_throughput_calculation() {
119 let meta = ExecutionMetadata::new(1000, 5, 500);
120 assert!(meta.throughput_rows_sec.is_some());
121 assert!((meta.throughput_rows_sec.unwrap() - 2000.0).abs() < 1.0);
122 assert!(meta.source_exhausted);
123 assert!(!meta.sampling_applied);
124 assert!(meta.sampling_ratio.is_none());
125 }
126
127 #[test]
128 fn test_execution_metadata_zero_time_no_throughput() {
129 let meta = ExecutionMetadata::new(100, 3, 0);
130 assert!(meta.throughput_rows_sec.is_none());
131 }
132
133 #[test]
134 fn test_execution_metadata_with_sampling() {
135 let meta = ExecutionMetadata::new(500, 3, 100).with_sampling(0.5);
136 assert!(meta.sampling_applied);
137 assert_eq!(meta.sampling_ratio, Some(0.5));
138 }
139
140 #[test]
141 fn test_execution_metadata_with_truncation() {
142 let meta =
143 ExecutionMetadata::new(1000, 5, 200).with_truncation(TruncationReason::MaxRows(1000));
144 assert!(!meta.source_exhausted);
145 assert!(meta.truncation_reason.is_some());
146 }
147
148 #[test]
149 fn test_truncation_reason_serde_roundtrip() {
150 let reasons = vec![
151 TruncationReason::MaxRows(5000),
152 TruncationReason::MaxBytes(1_000_000),
153 TruncationReason::MemoryPressure,
154 TruncationReason::StopCondition("accuracy > 0.95".to_string()),
155 TruncationReason::StreamClosed,
156 TruncationReason::Timeout,
157 ];
158
159 for reason in reasons {
160 let json = serde_json::to_string(&reason).unwrap();
161 let deserialized: TruncationReason = serde_json::from_str(&json).unwrap();
162 let json2 = serde_json::to_string(&deserialized).unwrap();
163 assert_eq!(json, json2);
164 }
165 }
166}