1use crate::execution::TruncationReason;
8
9#[derive(Debug, Clone, Default)]
14pub enum StopCondition {
15 MaxRows(u64),
17 MaxBytes(u64),
19 SchemaStable {
22 consecutive_stable_rows: u64,
26 },
27 ConfidenceThreshold(f64),
32 MemoryPressure(f64),
36 Any(Vec<StopCondition>),
38 All(Vec<StopCondition>),
40 #[default]
42 Never,
43}
44
45impl StopCondition {
46 pub fn schema_inference() -> Self {
49 StopCondition::Any(vec![
50 StopCondition::MaxRows(10_000),
51 StopCondition::SchemaStable {
52 consecutive_stable_rows: 1_000,
53 },
54 ])
55 }
56
57 pub fn quality_sample() -> Self {
59 StopCondition::Any(vec![
60 StopCondition::MaxRows(50_000),
61 StopCondition::MaxBytes(50 * 1024 * 1024),
62 StopCondition::ConfidenceThreshold(0.95),
63 ])
64 }
65}
66
67pub struct StopEvaluator {
72 condition: StopCondition,
73 rows_processed: u64,
74 bytes_consumed: u64,
75 estimated_total_rows: Option<u64>,
76 triggered_reason: Option<TruncationReason>,
77}
78
79impl StopEvaluator {
80 pub fn new(condition: StopCondition) -> Self {
81 let condition = Self::clamp_thresholds(condition);
82 Self {
83 condition,
84 rows_processed: 0,
85 bytes_consumed: 0,
86 estimated_total_rows: None,
87 triggered_reason: None,
88 }
89 }
90
91 fn clamp_thresholds(condition: StopCondition) -> StopCondition {
94 match condition {
95 StopCondition::ConfidenceThreshold(t) => {
96 StopCondition::ConfidenceThreshold(t.clamp(0.0, 1.0))
97 }
98 StopCondition::MemoryPressure(t) => StopCondition::MemoryPressure(t.clamp(0.0, 1.0)),
99 StopCondition::Any(cs) => {
100 StopCondition::Any(cs.into_iter().map(Self::clamp_thresholds).collect())
101 }
102 StopCondition::All(cs) => {
103 StopCondition::All(cs.into_iter().map(Self::clamp_thresholds).collect())
104 }
105 other => other,
106 }
107 }
108
109 pub fn with_estimated_total(mut self, rows: u64) -> Self {
111 self.estimated_total_rows = Some(rows);
112 self
113 }
114
115 pub fn update(&mut self, chunk_rows: u64, chunk_bytes: u64, memory_fraction: f64) -> bool {
123 self.rows_processed += chunk_rows;
124 self.bytes_consumed += chunk_bytes;
125
126 if self.triggered_reason.is_some() {
127 return true;
128 }
129
130 let reason = evaluate(
131 &self.condition,
132 self.rows_processed,
133 self.bytes_consumed,
134 memory_fraction,
135 self.estimated_total_rows,
136 );
137
138 if reason.is_some() {
139 self.triggered_reason = reason;
140 true
141 } else {
142 false
143 }
144 }
145
146 pub fn should_stop(&self) -> bool {
148 self.triggered_reason.is_some()
149 }
150
151 pub fn truncation_reason(&self) -> Option<TruncationReason> {
153 self.triggered_reason.clone()
154 }
155
156 pub fn rows_processed(&self) -> u64 {
158 self.rows_processed
159 }
160
161 pub fn bytes_consumed(&self) -> u64 {
163 self.bytes_consumed
164 }
165}
166
167fn evaluate(
170 condition: &StopCondition,
171 rows: u64,
172 bytes: u64,
173 memory_fraction: f64,
174 estimated_total: Option<u64>,
175) -> Option<TruncationReason> {
176 match condition {
177 StopCondition::MaxRows(limit) => {
178 if rows >= *limit {
179 Some(TruncationReason::MaxRows(*limit))
180 } else {
181 None
182 }
183 }
184 StopCondition::MaxBytes(limit) => {
185 if bytes >= *limit {
186 Some(TruncationReason::MaxBytes(*limit))
187 } else {
188 None
189 }
190 }
191 StopCondition::SchemaStable { .. } => {
192 None
197 }
198 StopCondition::ConfidenceThreshold(threshold) => {
199 if let Some(total) = estimated_total
200 && total > 0
201 {
202 let confidence = rows as f64 / total as f64;
203 if confidence >= *threshold {
204 return Some(TruncationReason::StopCondition(format!(
205 "confidence_threshold({})",
206 threshold
207 )));
208 }
209 }
210 None
211 }
212 StopCondition::MemoryPressure(threshold) => {
213 if memory_fraction >= *threshold {
214 Some(TruncationReason::MemoryPressure)
215 } else {
216 None
217 }
218 }
219 StopCondition::Any(conditions) => {
220 for c in conditions {
221 if let Some(reason) = evaluate(c, rows, bytes, memory_fraction, estimated_total) {
222 return Some(reason);
223 }
224 }
225 None
226 }
227 StopCondition::All(conditions) => {
228 if conditions.is_empty() {
229 return None;
230 }
231 let mut first_reason = None;
233 for c in conditions {
234 match evaluate(c, rows, bytes, memory_fraction, estimated_total) {
235 Some(reason) => {
236 if first_reason.is_none() {
237 first_reason = Some(reason);
238 }
239 }
240 None => return None,
241 }
242 }
243 first_reason
244 }
245 StopCondition::Never => None,
246 }
247}
248
249pub fn schema_stable_threshold(condition: &StopCondition) -> Option<u64> {
253 match condition {
254 StopCondition::SchemaStable {
255 consecutive_stable_rows,
256 } => Some(*consecutive_stable_rows),
257 StopCondition::Any(conditions) | StopCondition::All(conditions) => {
258 conditions.iter().find_map(schema_stable_threshold)
259 }
260 _ => None,
261 }
262}
263
264pub struct SchemaStabilityTracker {
271 threshold: u64,
272 consecutive_stable: u64,
273 last_fingerprint: Option<u64>,
274}
275
276impl SchemaStabilityTracker {
277 pub fn from_condition(condition: &StopCondition) -> Option<Self> {
280 schema_stable_threshold(condition).map(|threshold| Self {
281 threshold,
282 consecutive_stable: 0,
283 last_fingerprint: None,
284 })
285 }
286
287 pub fn update(&mut self, fingerprint: u64, chunk_rows: u64) -> bool {
291 match self.last_fingerprint {
292 Some(prev) if prev == fingerprint => {
293 self.consecutive_stable += chunk_rows;
294 }
295 _ => {
296 self.consecutive_stable = chunk_rows;
297 self.last_fingerprint = Some(fingerprint);
298 }
299 }
300 self.consecutive_stable >= self.threshold
301 }
302
303 pub fn truncation_reason(&self) -> TruncationReason {
305 TruncationReason::StopCondition(format!("schema_stable({})", self.threshold))
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[test]
314 fn test_max_rows_stops() {
315 let mut eval = StopEvaluator::new(StopCondition::MaxRows(100));
316 assert!(!eval.update(50, 0, 0.0));
317 assert!(!eval.should_stop());
318 assert!(eval.update(50, 0, 0.0));
319 assert!(eval.should_stop());
320 assert_eq!(
321 eval.truncation_reason(),
322 Some(TruncationReason::MaxRows(100))
323 );
324 }
325
326 #[test]
327 fn test_max_bytes_stops() {
328 let mut eval = StopEvaluator::new(StopCondition::MaxBytes(1000));
329 assert!(!eval.update(10, 500, 0.0));
330 assert!(eval.update(10, 600, 0.0));
331 assert_eq!(
332 eval.truncation_reason(),
333 Some(TruncationReason::MaxBytes(1000))
334 );
335 }
336
337 #[test]
338 fn test_memory_pressure_stops() {
339 let mut eval = StopEvaluator::new(StopCondition::MemoryPressure(0.9));
340 assert!(!eval.update(100, 0, 0.5));
341 assert!(eval.update(100, 0, 0.95));
342 assert_eq!(
343 eval.truncation_reason(),
344 Some(TruncationReason::MemoryPressure)
345 );
346 }
347
348 #[test]
349 fn test_confidence_threshold_without_estimate() {
350 let mut eval = StopEvaluator::new(StopCondition::ConfidenceThreshold(0.95));
352 assert!(!eval.update(100_000, 0, 0.0));
353 assert!(!eval.should_stop());
354 }
355
356 #[test]
357 fn test_confidence_threshold_with_estimate() {
358 let mut eval =
359 StopEvaluator::new(StopCondition::ConfidenceThreshold(0.95)).with_estimated_total(1000);
360 assert!(!eval.update(900, 0, 0.0));
361 assert!(eval.update(100, 0, 0.0)); }
363
364 #[test]
365 fn test_never_never_stops() {
366 let mut eval = StopEvaluator::new(StopCondition::Never);
367 for _ in 0..100 {
368 assert!(!eval.update(1_000_000, 1_000_000, 1.0));
369 }
370 assert!(!eval.should_stop());
371 }
372
373 #[test]
374 fn test_any_stops_on_first() {
375 let condition = StopCondition::Any(vec![
376 StopCondition::MaxRows(100),
377 StopCondition::MaxBytes(1_000_000),
378 ]);
379 let mut eval = StopEvaluator::new(condition);
380 assert!(eval.update(100, 500, 0.0));
382 assert_eq!(
383 eval.truncation_reason(),
384 Some(TruncationReason::MaxRows(100))
385 );
386 }
387
388 #[test]
389 fn test_all_requires_all() {
390 let condition = StopCondition::All(vec![
391 StopCondition::MaxRows(100),
392 StopCondition::MaxBytes(1000),
393 ]);
394 let mut eval = StopEvaluator::new(condition);
395 assert!(!eval.update(100, 500, 0.0));
397 assert!(eval.update(0, 600, 0.0));
399 assert_eq!(
400 eval.truncation_reason(),
401 Some(TruncationReason::MaxRows(100))
402 );
403 }
404
405 #[test]
406 fn test_all_empty_never_triggers() {
407 let mut eval = StopEvaluator::new(StopCondition::All(vec![]));
408 assert!(!eval.update(100, 100, 1.0));
409 }
410
411 #[test]
412 fn test_convenience_schema_inference() {
413 let condition = StopCondition::schema_inference();
414 match &condition {
415 StopCondition::Any(conditions) => {
416 assert_eq!(conditions.len(), 2);
417 assert!(matches!(conditions[0], StopCondition::MaxRows(10_000)));
418 assert!(matches!(
419 conditions[1],
420 StopCondition::SchemaStable {
421 consecutive_stable_rows: 1_000
422 }
423 ));
424 }
425 _ => panic!("Expected Any variant"),
426 }
427 }
428
429 #[test]
430 fn test_convenience_quality_sample() {
431 let condition = StopCondition::quality_sample();
432 match &condition {
433 StopCondition::Any(conditions) => {
434 assert_eq!(conditions.len(), 3);
435 assert!(matches!(conditions[0], StopCondition::MaxRows(50_000)));
436 assert!(matches!(conditions[1], StopCondition::MaxBytes(52_428_800)));
437 }
438 _ => panic!("Expected Any variant"),
439 }
440 }
441
442 #[test]
443 fn test_once_triggered_stays_triggered() {
444 let mut eval = StopEvaluator::new(StopCondition::MaxRows(10));
445 assert!(eval.update(10, 0, 0.0));
446 assert!(eval.update(5, 0, 0.0));
448 assert!(eval.should_stop());
449 }
450
451 #[test]
452 fn test_schema_stability_tracker() {
453 let condition = StopCondition::SchemaStable {
454 consecutive_stable_rows: 3,
455 };
456 let mut tracker = SchemaStabilityTracker::from_condition(&condition).unwrap();
457
458 let fp: u64 = 0xABCD;
459 assert!(!tracker.update(fp, 1)); assert!(!tracker.update(fp, 1)); assert!(tracker.update(fp, 1)); }
463
464 #[test]
465 fn test_schema_stability_tracker_resets_on_change() {
466 let condition = StopCondition::SchemaStable {
467 consecutive_stable_rows: 3,
468 };
469 let mut tracker = SchemaStabilityTracker::from_condition(&condition).unwrap();
470
471 let fp1: u64 = 0x1111;
472 let fp2: u64 = 0x2222;
473
474 assert!(!tracker.update(fp1, 1)); assert!(!tracker.update(fp1, 1)); assert!(!tracker.update(fp2, 1)); assert!(!tracker.update(fp2, 1)); assert!(tracker.update(fp2, 1)); }
481
482 #[test]
483 fn test_schema_stable_threshold_extraction() {
484 assert_eq!(schema_stable_threshold(&StopCondition::Never), None);
485 assert_eq!(
486 schema_stable_threshold(&StopCondition::SchemaStable {
487 consecutive_stable_rows: 500
488 }),
489 Some(500)
490 );
491 let nested = StopCondition::Any(vec![
493 StopCondition::MaxRows(100),
494 StopCondition::SchemaStable {
495 consecutive_stable_rows: 200,
496 },
497 ]);
498 assert_eq!(schema_stable_threshold(&nested), Some(200));
499 }
500
501 #[test]
502 fn test_rows_and_bytes_accessors() {
503 let mut eval = StopEvaluator::new(StopCondition::Never);
504 eval.update(100, 500, 0.0);
505 eval.update(200, 1000, 0.0);
506 assert_eq!(eval.rows_processed(), 300);
507 assert_eq!(eval.bytes_consumed(), 1500);
508 }
509}