Skip to main content

dataprof_core/
execution.rs

1/// Reason why profiling was truncated before exhausting the source.
2#[derive(Debug, Clone, PartialEq, serde::Serialize, serde::Deserialize)]
3pub enum TruncationReason {
4    /// Stopped after processing a maximum number of rows.
5    MaxRows(u64),
6    /// Stopped after consuming a maximum number of bytes.
7    MaxBytes(u64),
8    /// Stopped due to memory pressure.
9    MemoryPressure,
10    /// Stopped due to a user-defined stop condition.
11    StopCondition(String),
12    /// The input stream was closed by the producer.
13    StreamClosed,
14    /// Stopped due to a timeout.
15    Timeout,
16}
17
18/// Metadata about the profiling execution.
19#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
20pub struct ExecutionMetadata {
21    /// Number of rows actually processed or analyzed.
22    pub rows_processed: usize,
23    /// Number of bytes consumed from the source, if known.
24    #[serde(skip_serializing_if = "Option::is_none")]
25    pub bytes_consumed: Option<u64>,
26    /// Number of columns detected in the data.
27    pub columns_detected: usize,
28    /// Total execution time in milliseconds.
29    pub scan_time_ms: u128,
30    /// Throughput in rows per second, auto-calculated when possible.
31    #[serde(skip_serializing_if = "Option::is_none")]
32    pub throughput_rows_sec: Option<f64>,
33    /// Peak memory usage in megabytes, if tracked.
34    #[serde(skip_serializing_if = "Option::is_none")]
35    pub memory_peak_mb: Option<f64>,
36    /// Number of errors encountered during profiling.
37    pub error_count: usize,
38    /// Whether the entire source was consumed.
39    pub source_exhausted: bool,
40    /// If the source was not exhausted, why processing stopped.
41    #[serde(skip_serializing_if = "Option::is_none")]
42    pub truncation_reason: Option<TruncationReason>,
43    /// Whether sampling was applied.
44    pub sampling_applied: bool,
45    /// Ratio of rows analyzed to total rows when sampling is meaningful.
46    #[serde(skip_serializing_if = "Option::is_none")]
47    pub sampling_ratio: Option<f64>,
48}
49
50impl ExecutionMetadata {
51    /// Create new execution metadata with throughput calculated automatically.
52    pub fn new(rows_processed: usize, columns_detected: usize, scan_time_ms: u128) -> Self {
53        let throughput_rows_sec = if scan_time_ms > 0 {
54            Some(rows_processed as f64 / (scan_time_ms as f64 / 1000.0))
55        } else {
56            None
57        };
58
59        Self {
60            rows_processed,
61            bytes_consumed: None,
62            columns_detected,
63            scan_time_ms,
64            throughput_rows_sec,
65            memory_peak_mb: None,
66            error_count: 0,
67            source_exhausted: true,
68            truncation_reason: None,
69            sampling_applied: false,
70            sampling_ratio: None,
71        }
72    }
73
74    /// Set sampling information.
75    pub fn with_sampling(mut self, ratio: f64) -> Self {
76        self.sampling_applied = true;
77        self.sampling_ratio = Some(ratio);
78        self
79    }
80
81    /// Explicitly set whether the source was fully consumed.
82    pub fn with_source_exhausted(mut self, exhausted: bool) -> Self {
83        self.source_exhausted = exhausted;
84        self
85    }
86
87    /// Mark the execution as truncated.
88    pub fn with_truncation(mut self, reason: TruncationReason) -> Self {
89        self.source_exhausted = false;
90        self.truncation_reason = Some(reason);
91        self
92    }
93
94    /// Set the number of bytes consumed from the source.
95    pub fn with_bytes_consumed(mut self, bytes: u64) -> Self {
96        self.bytes_consumed = Some(bytes);
97        self
98    }
99
100    /// Set the error count.
101    pub fn with_error_count(mut self, count: usize) -> Self {
102        self.error_count = count;
103        self
104    }
105
106    /// Set peak memory usage.
107    pub fn with_memory_peak_mb(mut self, mb: f64) -> Self {
108        self.memory_peak_mb = Some(mb);
109        self
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116
117    #[test]
118    fn test_execution_metadata_throughput_calculation() {
119        let meta = ExecutionMetadata::new(1000, 5, 500);
120        assert!(meta.throughput_rows_sec.is_some());
121        assert!((meta.throughput_rows_sec.unwrap() - 2000.0).abs() < 1.0);
122        assert!(meta.source_exhausted);
123        assert!(!meta.sampling_applied);
124        assert!(meta.sampling_ratio.is_none());
125    }
126
127    #[test]
128    fn test_execution_metadata_zero_time_no_throughput() {
129        let meta = ExecutionMetadata::new(100, 3, 0);
130        assert!(meta.throughput_rows_sec.is_none());
131    }
132
133    #[test]
134    fn test_execution_metadata_with_sampling() {
135        let meta = ExecutionMetadata::new(500, 3, 100).with_sampling(0.5);
136        assert!(meta.sampling_applied);
137        assert_eq!(meta.sampling_ratio, Some(0.5));
138    }
139
140    #[test]
141    fn test_execution_metadata_with_truncation() {
142        let meta =
143            ExecutionMetadata::new(1000, 5, 200).with_truncation(TruncationReason::MaxRows(1000));
144        assert!(!meta.source_exhausted);
145        assert!(meta.truncation_reason.is_some());
146    }
147
148    #[test]
149    fn test_truncation_reason_serde_roundtrip() {
150        let reasons = vec![
151            TruncationReason::MaxRows(5000),
152            TruncationReason::MaxBytes(1_000_000),
153            TruncationReason::MemoryPressure,
154            TruncationReason::StopCondition("accuracy > 0.95".to_string()),
155            TruncationReason::StreamClosed,
156            TruncationReason::Timeout,
157        ];
158
159        for reason in reasons {
160            let json = serde_json::to_string(&reason).unwrap();
161            let deserialized: TruncationReason = serde_json::from_str(&json).unwrap();
162            let json2 = serde_json::to_string(&deserialized).unwrap();
163            assert_eq!(json, json2);
164        }
165    }
166}