use crate::types::TruncationReason;
#[derive(Debug, Clone, Default)]
pub enum StopCondition {
MaxRows(u64),
MaxBytes(u64),
SchemaStable {
consecutive_stable_rows: u64,
},
ConfidenceThreshold(f64),
MemoryPressure(f64),
Any(Vec<StopCondition>),
All(Vec<StopCondition>),
#[default]
Never,
}
impl StopCondition {
pub fn schema_inference() -> Self {
StopCondition::Any(vec![
StopCondition::MaxRows(10_000),
StopCondition::SchemaStable {
consecutive_stable_rows: 1_000,
},
])
}
pub fn quality_sample() -> Self {
StopCondition::Any(vec![
StopCondition::MaxRows(50_000),
StopCondition::MaxBytes(50 * 1024 * 1024),
StopCondition::ConfidenceThreshold(0.95),
])
}
}
pub struct StopEvaluator {
condition: StopCondition,
rows_processed: u64,
bytes_consumed: u64,
estimated_total_rows: Option<u64>,
triggered_reason: Option<TruncationReason>,
}
impl StopEvaluator {
pub fn new(condition: StopCondition) -> Self {
let condition = Self::clamp_thresholds(condition);
Self {
condition,
rows_processed: 0,
bytes_consumed: 0,
estimated_total_rows: None,
triggered_reason: None,
}
}
fn clamp_thresholds(condition: StopCondition) -> StopCondition {
match condition {
StopCondition::ConfidenceThreshold(t) => {
StopCondition::ConfidenceThreshold(t.clamp(0.0, 1.0))
}
StopCondition::MemoryPressure(t) => StopCondition::MemoryPressure(t.clamp(0.0, 1.0)),
StopCondition::Any(cs) => {
StopCondition::Any(cs.into_iter().map(Self::clamp_thresholds).collect())
}
StopCondition::All(cs) => {
StopCondition::All(cs.into_iter().map(Self::clamp_thresholds).collect())
}
other => other,
}
}
pub fn with_estimated_total(mut self, rows: u64) -> Self {
self.estimated_total_rows = Some(rows);
self
}
pub fn update(&mut self, chunk_rows: u64, chunk_bytes: u64, memory_fraction: f64) -> bool {
self.rows_processed += chunk_rows;
self.bytes_consumed += chunk_bytes;
if self.triggered_reason.is_some() {
return true;
}
let reason = evaluate(
&self.condition,
self.rows_processed,
self.bytes_consumed,
memory_fraction,
self.estimated_total_rows,
);
if reason.is_some() {
self.triggered_reason = reason;
true
} else {
false
}
}
pub fn should_stop(&self) -> bool {
self.triggered_reason.is_some()
}
pub fn truncation_reason(&self) -> Option<TruncationReason> {
self.triggered_reason.clone()
}
pub fn rows_processed(&self) -> u64 {
self.rows_processed
}
pub fn bytes_consumed(&self) -> u64 {
self.bytes_consumed
}
}
fn evaluate(
condition: &StopCondition,
rows: u64,
bytes: u64,
memory_fraction: f64,
estimated_total: Option<u64>,
) -> Option<TruncationReason> {
match condition {
StopCondition::MaxRows(limit) => {
if rows >= *limit {
Some(TruncationReason::MaxRows(*limit))
} else {
None
}
}
StopCondition::MaxBytes(limit) => {
if bytes >= *limit {
Some(TruncationReason::MaxBytes(*limit))
} else {
None
}
}
StopCondition::SchemaStable { .. } => {
None
}
StopCondition::ConfidenceThreshold(threshold) => {
if let Some(total) = estimated_total
&& total > 0
{
let confidence = rows as f64 / total as f64;
if confidence >= *threshold {
return Some(TruncationReason::StopCondition(format!(
"confidence_threshold({})",
threshold
)));
}
}
None
}
StopCondition::MemoryPressure(threshold) => {
if memory_fraction >= *threshold {
Some(TruncationReason::MemoryPressure)
} else {
None
}
}
StopCondition::Any(conditions) => {
for c in conditions {
if let Some(reason) = evaluate(c, rows, bytes, memory_fraction, estimated_total) {
return Some(reason);
}
}
None
}
StopCondition::All(conditions) => {
if conditions.is_empty() {
return None;
}
let mut first_reason = None;
for c in conditions {
match evaluate(c, rows, bytes, memory_fraction, estimated_total) {
Some(reason) => {
if first_reason.is_none() {
first_reason = Some(reason);
}
}
None => return None,
}
}
first_reason
}
StopCondition::Never => None,
}
}
pub fn schema_stable_threshold(condition: &StopCondition) -> Option<u64> {
match condition {
StopCondition::SchemaStable {
consecutive_stable_rows,
} => Some(*consecutive_stable_rows),
StopCondition::Any(conditions) | StopCondition::All(conditions) => {
conditions.iter().find_map(schema_stable_threshold)
}
_ => None,
}
}
pub struct SchemaStabilityTracker {
threshold: u64,
consecutive_stable: u64,
last_fingerprint: Option<u64>,
}
impl SchemaStabilityTracker {
pub fn from_condition(condition: &StopCondition) -> Option<Self> {
schema_stable_threshold(condition).map(|threshold| Self {
threshold,
consecutive_stable: 0,
last_fingerprint: None,
})
}
pub fn update(&mut self, fingerprint: u64, chunk_rows: u64) -> bool {
match self.last_fingerprint {
Some(prev) if prev == fingerprint => {
self.consecutive_stable += chunk_rows;
}
_ => {
self.consecutive_stable = chunk_rows;
self.last_fingerprint = Some(fingerprint);
}
}
self.consecutive_stable >= self.threshold
}
pub fn truncation_reason(&self) -> TruncationReason {
TruncationReason::StopCondition(format!("schema_stable({})", self.threshold))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_max_rows_stops() {
let mut eval = StopEvaluator::new(StopCondition::MaxRows(100));
assert!(!eval.update(50, 0, 0.0));
assert!(!eval.should_stop());
assert!(eval.update(50, 0, 0.0));
assert!(eval.should_stop());
assert_eq!(
eval.truncation_reason(),
Some(TruncationReason::MaxRows(100))
);
}
#[test]
fn test_max_bytes_stops() {
let mut eval = StopEvaluator::new(StopCondition::MaxBytes(1000));
assert!(!eval.update(10, 500, 0.0));
assert!(eval.update(10, 600, 0.0));
assert_eq!(
eval.truncation_reason(),
Some(TruncationReason::MaxBytes(1000))
);
}
#[test]
fn test_memory_pressure_stops() {
let mut eval = StopEvaluator::new(StopCondition::MemoryPressure(0.9));
assert!(!eval.update(100, 0, 0.5));
assert!(eval.update(100, 0, 0.95));
assert_eq!(
eval.truncation_reason(),
Some(TruncationReason::MemoryPressure)
);
}
#[test]
fn test_confidence_threshold_without_estimate() {
let mut eval = StopEvaluator::new(StopCondition::ConfidenceThreshold(0.95));
assert!(!eval.update(100_000, 0, 0.0));
assert!(!eval.should_stop());
}
#[test]
fn test_confidence_threshold_with_estimate() {
let mut eval =
StopEvaluator::new(StopCondition::ConfidenceThreshold(0.95)).with_estimated_total(1000);
assert!(!eval.update(900, 0, 0.0));
assert!(eval.update(100, 0, 0.0)); }
#[test]
fn test_never_never_stops() {
let mut eval = StopEvaluator::new(StopCondition::Never);
for _ in 0..100 {
assert!(!eval.update(1_000_000, 1_000_000, 1.0));
}
assert!(!eval.should_stop());
}
#[test]
fn test_any_stops_on_first() {
let condition = StopCondition::Any(vec![
StopCondition::MaxRows(100),
StopCondition::MaxBytes(1_000_000),
]);
let mut eval = StopEvaluator::new(condition);
assert!(eval.update(100, 500, 0.0));
assert_eq!(
eval.truncation_reason(),
Some(TruncationReason::MaxRows(100))
);
}
#[test]
fn test_all_requires_all() {
let condition = StopCondition::All(vec![
StopCondition::MaxRows(100),
StopCondition::MaxBytes(1000),
]);
let mut eval = StopEvaluator::new(condition);
assert!(!eval.update(100, 500, 0.0));
assert!(eval.update(0, 600, 0.0));
assert_eq!(
eval.truncation_reason(),
Some(TruncationReason::MaxRows(100))
);
}
#[test]
fn test_all_empty_never_triggers() {
let mut eval = StopEvaluator::new(StopCondition::All(vec![]));
assert!(!eval.update(100, 100, 1.0));
}
#[test]
fn test_convenience_schema_inference() {
let condition = StopCondition::schema_inference();
match &condition {
StopCondition::Any(conditions) => {
assert_eq!(conditions.len(), 2);
assert!(matches!(conditions[0], StopCondition::MaxRows(10_000)));
assert!(matches!(
conditions[1],
StopCondition::SchemaStable {
consecutive_stable_rows: 1_000
}
));
}
_ => panic!("Expected Any variant"),
}
}
#[test]
fn test_convenience_quality_sample() {
let condition = StopCondition::quality_sample();
match &condition {
StopCondition::Any(conditions) => {
assert_eq!(conditions.len(), 3);
assert!(matches!(conditions[0], StopCondition::MaxRows(50_000)));
assert!(matches!(conditions[1], StopCondition::MaxBytes(52_428_800)));
}
_ => panic!("Expected Any variant"),
}
}
#[test]
fn test_once_triggered_stays_triggered() {
let mut eval = StopEvaluator::new(StopCondition::MaxRows(10));
assert!(eval.update(10, 0, 0.0));
assert!(eval.update(5, 0, 0.0));
assert!(eval.should_stop());
}
#[test]
fn test_schema_stability_tracker() {
let condition = StopCondition::SchemaStable {
consecutive_stable_rows: 3,
};
let mut tracker = SchemaStabilityTracker::from_condition(&condition).unwrap();
let fp: u64 = 0xABCD;
assert!(!tracker.update(fp, 1)); assert!(!tracker.update(fp, 1)); assert!(tracker.update(fp, 1)); }
#[test]
fn test_schema_stability_tracker_resets_on_change() {
let condition = StopCondition::SchemaStable {
consecutive_stable_rows: 3,
};
let mut tracker = SchemaStabilityTracker::from_condition(&condition).unwrap();
let fp1: u64 = 0x1111;
let fp2: u64 = 0x2222;
assert!(!tracker.update(fp1, 1)); assert!(!tracker.update(fp1, 1)); assert!(!tracker.update(fp2, 1)); assert!(!tracker.update(fp2, 1)); assert!(tracker.update(fp2, 1)); }
#[test]
fn test_schema_stable_threshold_extraction() {
assert_eq!(schema_stable_threshold(&StopCondition::Never), None);
assert_eq!(
schema_stable_threshold(&StopCondition::SchemaStable {
consecutive_stable_rows: 500
}),
Some(500)
);
let nested = StopCondition::Any(vec![
StopCondition::MaxRows(100),
StopCondition::SchemaStable {
consecutive_stable_rows: 200,
},
]);
assert_eq!(schema_stable_threshold(&nested), Some(200));
}
#[test]
fn test_rows_and_bytes_accessors() {
let mut eval = StopEvaluator::new(StopCondition::Never);
eval.update(100, 500, 0.0);
eval.update(200, 1000, 0.0);
assert_eq!(eval.rows_processed(), 300);
assert_eq!(eval.bytes_consumed(), 1500);
}
}