Skip to main content

dataprof_core/
stop_condition.rs

1//! Composable stop conditions for early termination of stream-based profiling.
2//!
3//! A [`StopCondition`] describes *when* to stop; a [`StopEvaluator`] is the
4//! mutable runtime checker that tracks counters and evaluates the condition
5//! after each chunk.
6
7use crate::execution::TruncationReason;
8
9/// A composable condition that can trigger early termination of profiling.
10///
11/// Conditions are checked per-chunk (not per-row) for performance.
12/// The actual row count at termination may slightly exceed the limit.
13#[derive(Debug, Clone, Default)]
14pub enum StopCondition {
15    /// Stop after processing this many rows.
16    MaxRows(u64),
17    /// Stop after consuming this many bytes from the source.
18    MaxBytes(u64),
19    /// Stop when column types have not changed for approximately N rows
20    /// (accumulated across chunks).
21    SchemaStable {
22        /// Approximate number of rows with no type changes before stopping.
23        /// Rows are accumulated per-chunk, so the actual count depends on
24        /// chunk granularity.
25        consecutive_stable_rows: u64,
26    },
27    /// Stop when `rows_processed / estimated_total >= threshold`.
28    ///
29    /// Only meaningful when an estimated total row count is available.
30    /// When no estimate exists, this condition is inert.
31    ConfidenceThreshold(f64),
32    /// Stop when memory usage exceeds this fraction of the configured limit.
33    ///
34    /// Value in `0.0..=1.0` (e.g., `0.9` = 90% of memory limit).
35    MemoryPressure(f64),
36    /// Stop when **any** sub-condition triggers.
37    Any(Vec<StopCondition>),
38    /// Stop when **all** sub-conditions have triggered.
39    All(Vec<StopCondition>),
40    /// Never stop early — process the entire stream (default).
41    #[default]
42    Never,
43}
44
45impl StopCondition {
46    /// Preset for schema-only profiling: stop after 10K rows or when schema
47    /// stabilizes (no type changes for 1,000 consecutive rows).
48    pub fn schema_inference() -> Self {
49        StopCondition::Any(vec![
50            StopCondition::MaxRows(10_000),
51            StopCondition::SchemaStable {
52                consecutive_stable_rows: 1_000,
53            },
54        ])
55    }
56
57    /// Preset for quality sampling: stop after 50K rows, 50 MB, or 95% confidence.
58    pub fn quality_sample() -> Self {
59        StopCondition::Any(vec![
60            StopCondition::MaxRows(50_000),
61            StopCondition::MaxBytes(50 * 1024 * 1024),
62            StopCondition::ConfidenceThreshold(0.95),
63        ])
64    }
65}
66
67/// Runtime evaluator that checks a [`StopCondition`] against accumulated counters.
68///
69/// Create one before the processing loop and call [`update`](Self::update)
70/// after each chunk. When it returns `true`, profiling should stop.
71pub struct StopEvaluator {
72    condition: StopCondition,
73    rows_processed: u64,
74    bytes_consumed: u64,
75    estimated_total_rows: Option<u64>,
76    triggered_reason: Option<TruncationReason>,
77}
78
79impl StopEvaluator {
80    pub fn new(condition: StopCondition) -> Self {
81        let condition = Self::clamp_thresholds(condition);
82        Self {
83            condition,
84            rows_processed: 0,
85            bytes_consumed: 0,
86            estimated_total_rows: None,
87            triggered_reason: None,
88        }
89    }
90
91    /// Clamp `ConfidenceThreshold` and `MemoryPressure` values to `0.0..=1.0`,
92    /// recursing into `Any`/`All` composites.
93    fn clamp_thresholds(condition: StopCondition) -> StopCondition {
94        match condition {
95            StopCondition::ConfidenceThreshold(t) => {
96                StopCondition::ConfidenceThreshold(t.clamp(0.0, 1.0))
97            }
98            StopCondition::MemoryPressure(t) => StopCondition::MemoryPressure(t.clamp(0.0, 1.0)),
99            StopCondition::Any(cs) => {
100                StopCondition::Any(cs.into_iter().map(Self::clamp_thresholds).collect())
101            }
102            StopCondition::All(cs) => {
103                StopCondition::All(cs.into_iter().map(Self::clamp_thresholds).collect())
104            }
105            other => other,
106        }
107    }
108
109    /// Provide an estimated total row count (enables `ConfidenceThreshold`).
110    pub fn with_estimated_total(mut self, rows: u64) -> Self {
111        self.estimated_total_rows = Some(rows);
112        self
113    }
114
115    /// Update counters and evaluate the stop condition.
116    ///
117    /// Returns `true` when profiling should stop.
118    ///
119    /// - `chunk_rows`: number of rows in the chunk just processed
120    /// - `chunk_bytes`: number of bytes consumed by this chunk
121    /// - `memory_fraction`: current memory usage as a fraction of the limit (`0.0..1.0`)
122    pub fn update(&mut self, chunk_rows: u64, chunk_bytes: u64, memory_fraction: f64) -> bool {
123        self.rows_processed += chunk_rows;
124        self.bytes_consumed += chunk_bytes;
125
126        if self.triggered_reason.is_some() {
127            return true;
128        }
129
130        let reason = evaluate(
131            &self.condition,
132            self.rows_processed,
133            self.bytes_consumed,
134            memory_fraction,
135            self.estimated_total_rows,
136        );
137
138        if reason.is_some() {
139            self.triggered_reason = reason;
140            true
141        } else {
142            false
143        }
144    }
145
146    /// Returns `true` if a stop condition has already been triggered.
147    pub fn should_stop(&self) -> bool {
148        self.triggered_reason.is_some()
149    }
150
151    /// The reason profiling stopped, mapped to [`TruncationReason`].
152    pub fn truncation_reason(&self) -> Option<TruncationReason> {
153        self.triggered_reason.clone()
154    }
155
156    /// Total rows processed so far.
157    pub fn rows_processed(&self) -> u64 {
158        self.rows_processed
159    }
160
161    /// Total bytes consumed so far.
162    pub fn bytes_consumed(&self) -> u64 {
163        self.bytes_consumed
164    }
165}
166
167/// Recursively evaluate a condition against current counters.
168/// Returns `Some(reason)` if the condition is met.
169fn evaluate(
170    condition: &StopCondition,
171    rows: u64,
172    bytes: u64,
173    memory_fraction: f64,
174    estimated_total: Option<u64>,
175) -> Option<TruncationReason> {
176    match condition {
177        StopCondition::MaxRows(limit) => {
178            if rows >= *limit {
179                Some(TruncationReason::MaxRows(*limit))
180            } else {
181                None
182            }
183        }
184        StopCondition::MaxBytes(limit) => {
185            if bytes >= *limit {
186                Some(TruncationReason::MaxBytes(*limit))
187            } else {
188                None
189            }
190        }
191        StopCondition::SchemaStable { .. } => {
192            // SchemaStable requires column type tracking which is handled
193            // at the engine level via `SchemaStabilityTracker`. The evaluator
194            // alone cannot detect schema changes.
195            // See: IncrementalProfiler and AsyncStreamingProfiler integration.
196            None
197        }
198        StopCondition::ConfidenceThreshold(threshold) => {
199            if let Some(total) = estimated_total
200                && total > 0
201            {
202                let confidence = rows as f64 / total as f64;
203                if confidence >= *threshold {
204                    return Some(TruncationReason::StopCondition(format!(
205                        "confidence_threshold({})",
206                        threshold
207                    )));
208                }
209            }
210            None
211        }
212        StopCondition::MemoryPressure(threshold) => {
213            if memory_fraction >= *threshold {
214                Some(TruncationReason::MemoryPressure)
215            } else {
216                None
217            }
218        }
219        StopCondition::Any(conditions) => {
220            for c in conditions {
221                if let Some(reason) = evaluate(c, rows, bytes, memory_fraction, estimated_total) {
222                    return Some(reason);
223                }
224            }
225            None
226        }
227        StopCondition::All(conditions) => {
228            if conditions.is_empty() {
229                return None;
230            }
231            // All must have triggered — collect reasons, return first
232            let mut first_reason = None;
233            for c in conditions {
234                match evaluate(c, rows, bytes, memory_fraction, estimated_total) {
235                    Some(reason) => {
236                        if first_reason.is_none() {
237                            first_reason = Some(reason);
238                        }
239                    }
240                    None => return None,
241                }
242            }
243            first_reason
244        }
245        StopCondition::Never => None,
246    }
247}
248
249/// Extracts the `consecutive_stable_rows` threshold from a `StopCondition`,
250/// searching through `Any`/`All` composites. Returns `None` if no
251/// `SchemaStable` variant is present.
252pub fn schema_stable_threshold(condition: &StopCondition) -> Option<u64> {
253    match condition {
254        StopCondition::SchemaStable {
255            consecutive_stable_rows,
256        } => Some(*consecutive_stable_rows),
257        StopCondition::Any(conditions) | StopCondition::All(conditions) => {
258            conditions.iter().find_map(schema_stable_threshold)
259        }
260        _ => None,
261    }
262}
263
264/// Tracks consecutive rows where the schema (column types) has not changed.
265/// Used by engines to implement `SchemaStable` stop conditions.
266///
267/// Accepts a fingerprint (hash) of the column types and accumulates rows
268/// across chunks. When the accumulated stable-row count reaches the
269/// threshold, the tracker signals that profiling may stop.
270pub struct SchemaStabilityTracker {
271    threshold: u64,
272    consecutive_stable: u64,
273    last_fingerprint: Option<u64>,
274}
275
276impl SchemaStabilityTracker {
277    /// Create a tracker for the given threshold. Returns `None` if no
278    /// `SchemaStable` condition is present.
279    pub fn from_condition(condition: &StopCondition) -> Option<Self> {
280        schema_stable_threshold(condition).map(|threshold| Self {
281            threshold,
282            consecutive_stable: 0,
283            last_fingerprint: None,
284        })
285    }
286
287    /// Update with the current schema fingerprint and the number of rows in
288    /// the chunk. Returns `true` when the accumulated stable-row count reaches
289    /// the threshold.
290    pub fn update(&mut self, fingerprint: u64, chunk_rows: u64) -> bool {
291        match self.last_fingerprint {
292            Some(prev) if prev == fingerprint => {
293                self.consecutive_stable += chunk_rows;
294            }
295            _ => {
296                self.consecutive_stable = chunk_rows;
297                self.last_fingerprint = Some(fingerprint);
298            }
299        }
300        self.consecutive_stable >= self.threshold
301    }
302
303    /// The truncation reason when schema stability is reached.
304    pub fn truncation_reason(&self) -> TruncationReason {
305        TruncationReason::StopCondition(format!("schema_stable({})", self.threshold))
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    #[test]
314    fn test_max_rows_stops() {
315        let mut eval = StopEvaluator::new(StopCondition::MaxRows(100));
316        assert!(!eval.update(50, 0, 0.0));
317        assert!(!eval.should_stop());
318        assert!(eval.update(50, 0, 0.0));
319        assert!(eval.should_stop());
320        assert_eq!(
321            eval.truncation_reason(),
322            Some(TruncationReason::MaxRows(100))
323        );
324    }
325
326    #[test]
327    fn test_max_bytes_stops() {
328        let mut eval = StopEvaluator::new(StopCondition::MaxBytes(1000));
329        assert!(!eval.update(10, 500, 0.0));
330        assert!(eval.update(10, 600, 0.0));
331        assert_eq!(
332            eval.truncation_reason(),
333            Some(TruncationReason::MaxBytes(1000))
334        );
335    }
336
337    #[test]
338    fn test_memory_pressure_stops() {
339        let mut eval = StopEvaluator::new(StopCondition::MemoryPressure(0.9));
340        assert!(!eval.update(100, 0, 0.5));
341        assert!(eval.update(100, 0, 0.95));
342        assert_eq!(
343            eval.truncation_reason(),
344            Some(TruncationReason::MemoryPressure)
345        );
346    }
347
348    #[test]
349    fn test_confidence_threshold_without_estimate() {
350        // Without estimated_total, ConfidenceThreshold is inert
351        let mut eval = StopEvaluator::new(StopCondition::ConfidenceThreshold(0.95));
352        assert!(!eval.update(100_000, 0, 0.0));
353        assert!(!eval.should_stop());
354    }
355
356    #[test]
357    fn test_confidence_threshold_with_estimate() {
358        let mut eval =
359            StopEvaluator::new(StopCondition::ConfidenceThreshold(0.95)).with_estimated_total(1000);
360        assert!(!eval.update(900, 0, 0.0));
361        assert!(eval.update(100, 0, 0.0)); // 1000/1000 = 1.0 >= 0.95
362    }
363
364    #[test]
365    fn test_never_never_stops() {
366        let mut eval = StopEvaluator::new(StopCondition::Never);
367        for _ in 0..100 {
368            assert!(!eval.update(1_000_000, 1_000_000, 1.0));
369        }
370        assert!(!eval.should_stop());
371    }
372
373    #[test]
374    fn test_any_stops_on_first() {
375        let condition = StopCondition::Any(vec![
376            StopCondition::MaxRows(100),
377            StopCondition::MaxBytes(1_000_000),
378        ]);
379        let mut eval = StopEvaluator::new(condition);
380        // MaxRows triggers first (100 rows, only 500 bytes)
381        assert!(eval.update(100, 500, 0.0));
382        assert_eq!(
383            eval.truncation_reason(),
384            Some(TruncationReason::MaxRows(100))
385        );
386    }
387
388    #[test]
389    fn test_all_requires_all() {
390        let condition = StopCondition::All(vec![
391            StopCondition::MaxRows(100),
392            StopCondition::MaxBytes(1000),
393        ]);
394        let mut eval = StopEvaluator::new(condition);
395        // Only rows met, bytes not yet
396        assert!(!eval.update(100, 500, 0.0));
397        // Now both met
398        assert!(eval.update(0, 600, 0.0));
399        assert_eq!(
400            eval.truncation_reason(),
401            Some(TruncationReason::MaxRows(100))
402        );
403    }
404
405    #[test]
406    fn test_all_empty_never_triggers() {
407        let mut eval = StopEvaluator::new(StopCondition::All(vec![]));
408        assert!(!eval.update(100, 100, 1.0));
409    }
410
411    #[test]
412    fn test_convenience_schema_inference() {
413        let condition = StopCondition::schema_inference();
414        match &condition {
415            StopCondition::Any(conditions) => {
416                assert_eq!(conditions.len(), 2);
417                assert!(matches!(conditions[0], StopCondition::MaxRows(10_000)));
418                assert!(matches!(
419                    conditions[1],
420                    StopCondition::SchemaStable {
421                        consecutive_stable_rows: 1_000
422                    }
423                ));
424            }
425            _ => panic!("Expected Any variant"),
426        }
427    }
428
429    #[test]
430    fn test_convenience_quality_sample() {
431        let condition = StopCondition::quality_sample();
432        match &condition {
433            StopCondition::Any(conditions) => {
434                assert_eq!(conditions.len(), 3);
435                assert!(matches!(conditions[0], StopCondition::MaxRows(50_000)));
436                assert!(matches!(conditions[1], StopCondition::MaxBytes(52_428_800)));
437            }
438            _ => panic!("Expected Any variant"),
439        }
440    }
441
442    #[test]
443    fn test_once_triggered_stays_triggered() {
444        let mut eval = StopEvaluator::new(StopCondition::MaxRows(10));
445        assert!(eval.update(10, 0, 0.0));
446        // Calling update again still returns true
447        assert!(eval.update(5, 0, 0.0));
448        assert!(eval.should_stop());
449    }
450
451    #[test]
452    fn test_schema_stability_tracker() {
453        let condition = StopCondition::SchemaStable {
454            consecutive_stable_rows: 3,
455        };
456        let mut tracker = SchemaStabilityTracker::from_condition(&condition).unwrap();
457
458        let fp: u64 = 0xABCD;
459        assert!(!tracker.update(fp, 1)); // consecutive = 1
460        assert!(!tracker.update(fp, 1)); // consecutive = 2
461        assert!(tracker.update(fp, 1)); // consecutive = 3 -> triggers
462    }
463
464    #[test]
465    fn test_schema_stability_tracker_resets_on_change() {
466        let condition = StopCondition::SchemaStable {
467            consecutive_stable_rows: 3,
468        };
469        let mut tracker = SchemaStabilityTracker::from_condition(&condition).unwrap();
470
471        let fp1: u64 = 0x1111;
472        let fp2: u64 = 0x2222;
473
474        assert!(!tracker.update(fp1, 1)); // consecutive = 1
475        assert!(!tracker.update(fp1, 1)); // consecutive = 2
476        // Schema changes - counter resets
477        assert!(!tracker.update(fp2, 1)); // consecutive = 1
478        assert!(!tracker.update(fp2, 1)); // consecutive = 2
479        assert!(tracker.update(fp2, 1)); // consecutive = 3 -> triggers
480    }
481
482    #[test]
483    fn test_schema_stable_threshold_extraction() {
484        assert_eq!(schema_stable_threshold(&StopCondition::Never), None);
485        assert_eq!(
486            schema_stable_threshold(&StopCondition::SchemaStable {
487                consecutive_stable_rows: 500
488            }),
489            Some(500)
490        );
491        // Nested in Any
492        let nested = StopCondition::Any(vec![
493            StopCondition::MaxRows(100),
494            StopCondition::SchemaStable {
495                consecutive_stable_rows: 200,
496            },
497        ]);
498        assert_eq!(schema_stable_threshold(&nested), Some(200));
499    }
500
501    #[test]
502    fn test_rows_and_bytes_accessors() {
503        let mut eval = StopEvaluator::new(StopCondition::Never);
504        eval.update(100, 500, 0.0);
505        eval.update(200, 1000, 0.0);
506        assert_eq!(eval.rows_processed(), 300);
507        assert_eq!(eval.bytes_consumed(), 1500);
508    }
509}