use super::*;
use arrow::array::*;
use arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;
#[test]
fn strip_internal_column_from_schema() {
let schema = Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("_rivet_chunk_rn", DataType::Int64, false),
Field::new("name", DataType::Utf8, false),
]);
let result = ExportSink::schema_without_internal(&schema, "_rivet_chunk_rn").unwrap();
assert_eq!(result.fields().len(), 2);
assert_eq!(result.field(0).name(), "id");
assert_eq!(result.field(1).name(), "name");
}
#[test]
fn strip_internal_column_missing_errors() {
let schema = Schema::new(vec![Field::new("id", DataType::Int64, false)]);
assert!(ExportSink::schema_without_internal(&schema, "nonexistent").is_err());
}
#[test]
fn strip_internal_column_from_batch() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int64, false),
Field::new("_rivet_chunk_rn", DataType::Int64, false),
]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(Int64Array::from(vec![1, 2])),
Arc::new(Int64Array::from(vec![100, 200])),
],
)
.unwrap();
let stripped = ExportSink::record_batch_without_internal(&batch, "_rivet_chunk_rn").unwrap();
assert_eq!(stripped.num_columns(), 1);
assert_eq!(stripped.schema().field(0).name(), "id");
let ids = stripped
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(ids.value(0), 1);
assert_eq!(ids.value(1), 2);
}
#[test]
fn i1_writer_finish_produces_complete_file_before_destination_write() {
use crate::source::BatchSink;
use arrow::array::StringArray;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::record_batch::RecordBatch;
use std::sync::Arc;
let mut sink = minimal_sink();
let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec!["alice", "bob", "carol"]))],
)
.unwrap();
assert!(sink.writer.is_none(), "writer must start as None");
sink.on_schema(schema).unwrap();
assert!(
sink.writer.is_some(),
"writer must be present after on_schema"
);
sink.on_batch(&batch).unwrap();
if let Some(w) = sink.writer.take() {
w.finish()
.expect("I1: writer.finish() must succeed before destination write");
}
assert!(
sink.writer.is_none(),
"writer must be consumed after finish()"
);
let file_len = std::fs::metadata(sink.tmp.path())
.expect("temp file must be accessible after finish()")
.len();
assert!(
file_len > 0,
"I1: temp file must be non-empty after writer.finish() — \
the destination must never receive a truncated file; got {} bytes",
file_len
);
assert_eq!(sink.total_rows, 3, "total_rows must count written rows");
}
#[test]
fn track_quality_counts_nulls() {
use crate::source::BatchSink;
let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec![
Some("a"),
None,
None,
Some("d"),
]))],
)
.unwrap();
let mut sink = minimal_sink_with_quality(vec!["name".into()], vec![]);
sink.on_schema(schema).unwrap();
sink.track_quality(&batch);
assert_eq!(sink.quality_null_counts.get("name"), Some(&2));
}
#[test]
fn track_quality_counts_uniques() {
use crate::source::BatchSink;
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2, 1, 3]))],
)
.unwrap();
let mut sink = minimal_sink_with_quality(vec![], vec!["id".into()]);
sink.on_schema(schema).unwrap();
sink.track_quality(&batch);
assert_eq!(sink.quality_unique_sets.get("id").unwrap().len(), 3);
}
#[test]
fn run_quality_checks_no_config_returns_empty() {
let sink = ExportSink {
quality_columns: None,
total_rows: 100,
..minimal_sink()
};
assert!(sink.run_quality_checks().is_empty());
}
#[test]
fn run_quality_checks_detects_excess_nulls() {
let mut sink = minimal_sink_with_quality(vec!["col".into()], vec![]);
sink.quality_columns
.as_mut()
.unwrap()
.null_ratio_max
.insert("col".into(), 0.1);
sink.total_rows = 100;
sink.quality_null_counts.insert("col".into(), 50);
let issues = sink.run_quality_checks();
assert_eq!(issues.len(), 1);
assert!(issues[0].message.contains("null ratio"));
}
#[test]
fn run_quality_checks_detects_duplicates() {
let mut sink = minimal_sink_with_quality(vec![], vec!["id".into()]);
sink.total_rows = 5;
let mut set = std::collections::HashSet::new();
set.insert(0u64);
set.insert(1u64);
set.insert(2u64);
sink.quality_unique_sets.insert("id".into(), set);
sink.quality_unique_non_null_counts.insert("id".into(), 5);
let issues = sink.run_quality_checks();
assert_eq!(issues.len(), 1);
assert!(issues[0].message.contains("duplicate"));
}
fn sink_with_unique_cap(unique_cols: Vec<String>, cap: usize) -> ExportSink {
ExportSink {
quality_columns: Some(crate::config::QualityConfig {
row_count_min: None,
row_count_max: None,
null_ratio_max: std::collections::HashMap::new(),
unique_columns: unique_cols,
unique_max_entries: Some(cap),
}),
..minimal_sink()
}
}
#[test]
fn unique_cap_stops_inserting_at_limit() {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5]))],
)
.unwrap();
let mut sink = sink_with_unique_cap(vec!["id".into()], 3);
sink.quality_unique_indices = vec![(0, "id".into())];
sink.track_quality(&batch);
let set_len = sink
.quality_unique_sets
.get("id")
.map(|s| s.len())
.unwrap_or(0);
assert!(set_len <= 3, "set must not grow past cap; got {set_len}");
assert!(
sink.quality_unique_capped.contains("id"),
"id must be flagged as capped"
);
}
#[test]
fn unique_cap_emits_warn_issue_not_fail() {
let mut sink = sink_with_unique_cap(vec!["id".into()], 2);
sink.total_rows = 5;
sink.quality_unique_capped.insert("id".into());
let mut set = std::collections::HashSet::new();
set.insert(0u64);
set.insert(1u64);
sink.quality_unique_sets.insert("id".into(), set);
let issues = sink.run_quality_checks();
assert_eq!(issues.len(), 1);
assert_eq!(issues[0].severity, crate::quality::Severity::Warn);
assert!(
issues[0].message.contains("capped"),
"message must say 'capped'; got: {}",
issues[0].message
);
}
#[test]
fn unique_no_cap_grows_unbounded() {
use crate::source::BatchSink;
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from_iter_values(0..10i64))],
)
.unwrap();
let mut sink = minimal_sink_with_quality(vec![], vec!["id".into()]);
sink.on_schema(schema).unwrap();
sink.track_quality(&batch);
assert_eq!(
sink.quality_unique_sets.get("id").unwrap().len(),
10,
"without cap all distinct values must be tracked"
);
assert!(
sink.quality_unique_capped.is_empty(),
"no cap → no column should be flagged"
);
}
#[test]
fn unique_cap_column_skipped_in_subsequent_batches() {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let batch1 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
)
.unwrap();
let batch2 = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![4, 5, 6]))],
)
.unwrap();
let mut sink = sink_with_unique_cap(vec!["id".into()], 2);
sink.quality_unique_indices = vec![(0, "id".into())];
sink.track_quality(&batch1); let len_after_first = sink
.quality_unique_sets
.get("id")
.map(|s| s.len())
.unwrap_or(0);
sink.track_quality(&batch2); let len_after_second = sink
.quality_unique_sets
.get("id")
.map(|s| s.len())
.unwrap_or(0);
assert_eq!(
len_after_first, len_after_second,
"set must not grow after cap is hit; was {len_after_first}, then {len_after_second}"
);
}
fn utf8_batch(values: Vec<&str>) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("body", DataType::Utf8, true)]));
RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(values))]).unwrap()
}
#[test]
fn value_ceiling_rejects_oversized_cell() {
let mut sink = minimal_sink();
sink.max_value_bytes = Some(1024); let big = "x".repeat(2048);
let batch = utf8_batch(vec!["small", &big]);
let err = sink.check_value_ceiling(&batch).unwrap_err();
let msg = format!("{err:#}");
assert!(msg.contains("RIVET_VALUE_TOO_LARGE"), "got: {msg}");
assert!(msg.contains("body"), "should name the column: {msg}");
}
#[test]
fn value_ceiling_passes_under_limit() {
let mut sink = minimal_sink();
sink.max_value_bytes = Some(1024);
let batch = utf8_batch(vec!["a", "bb", "ccc"]);
assert!(sink.check_value_ceiling(&batch).is_ok());
}
#[test]
fn value_ceiling_disabled_when_none() {
let mut sink = minimal_sink();
sink.max_value_bytes = None; let big = "x".repeat(10 * 1024 * 1024);
let batch = utf8_batch(vec![&big]);
assert!(
sink.check_value_ceiling(&batch).is_ok(),
"None must disable the guard entirely"
);
}
#[test]
fn value_ceiling_ignores_null_cells() {
let mut sink = minimal_sink();
sink.max_value_bytes = Some(1024);
let schema = Arc::new(Schema::new(vec![Field::new("body", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(StringArray::from(vec![Some("ok"), None]))],
)
.unwrap();
assert!(sink.check_value_ceiling(&batch).is_ok());
}
fn minimal_sink() -> ExportSink {
ExportSink {
writer: None,
format_type: crate::config::FormatType::Csv,
compression: crate::config::CompressionType::None,
compression_level: None,
tmp: tempfile::NamedTempFile::new().unwrap(),
total_rows: 0,
part_rows: 0,
cursor_column: None,
last_cursor_value: None,
schema: None,
dest_schema: None,
meta: crate::config::MetaColumns::default(),
enriched_schema: None,
exported_at_us: 0,
quality_null_counts: std::collections::HashMap::new(),
quality_unique_sets: std::collections::HashMap::new(),
quality_unique_non_null_counts: std::collections::HashMap::new(),
quality_unique_capped: std::collections::HashSet::new(),
quality_columns: None,
quality_null_indices: Vec::new(),
quality_unique_indices: Vec::new(),
max_file_size: None,
completed_parts: Vec::new(),
strip_internal_column: None,
column_max_bytes: std::collections::HashMap::new(),
max_batch_memory_bytes: None,
max_value_bytes: None,
batch_memory_policy: crate::tuning::BatchMemoryPolicy::Warn,
oversized_batch_count: 0,
parquet_config: None,
parquet_row_group_rows: None,
column_checksums: std::collections::BTreeMap::new(),
checksum_key_col: None,
}
}
fn minimal_sink_with_quality(null_cols: Vec<String>, unique_cols: Vec<String>) -> ExportSink {
let mut null_ratio_max = std::collections::HashMap::new();
for col in &null_cols {
null_ratio_max.insert(col.clone(), 0.5);
}
ExportSink {
quality_columns: Some(crate::config::QualityConfig {
row_count_min: None,
row_count_max: None,
null_ratio_max,
unique_columns: unique_cols,
unique_max_entries: None,
}),
..minimal_sink()
}
}
fn make_int_batch(n_rows: usize, n_cols: usize) -> RecordBatch {
use arrow::array::Int64Array;
use arrow::datatypes::{DataType, Field, Schema};
let fields: Vec<Field> = (0..n_cols)
.map(|i| Field::new(format!("c{i}"), DataType::Int64, false))
.collect();
let schema = Arc::new(Schema::new(fields));
let cols: Vec<Arc<dyn arrow::array::Array>> = (0..n_cols)
.map(|_| {
Arc::new(Int64Array::from_iter_values(0..n_rows as i64)) as Arc<dyn arrow::array::Array>
})
.collect();
RecordBatch::try_new(schema, cols).unwrap()
}
#[test]
fn warn_policy_does_not_fail_on_oversized_batch() {
let batch = make_int_batch(1_000, 10); let batch_bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
assert!(batch_bytes > 0);
let mut sink = ExportSink {
max_batch_memory_bytes: Some(1), batch_memory_policy: crate::tuning::BatchMemoryPolicy::Warn,
..minimal_sink()
};
use arrow::datatypes::{DataType, Field, Schema};
let schema = Arc::new(Schema::new(
(0..10)
.map(|i| Field::new(format!("c{i}"), DataType::Int64, false))
.collect::<Vec<_>>(),
));
sink.enriched_schema = Some(schema);
assert!(sink.on_batch_inner(&batch).is_ok());
assert_eq!(sink.oversized_batch_count, 0); }
#[test]
fn fail_policy_returns_error_on_oversized_batch() {
let schema_fields: Vec<arrow::datatypes::Field> = (0..5)
.map(|i| {
arrow::datatypes::Field::new(format!("c{i}"), arrow::datatypes::DataType::Int64, false)
})
.collect();
let schema = std::sync::Arc::new(arrow::datatypes::Schema::new(schema_fields.clone()));
let batch = RecordBatch::try_new(
schema.clone(),
schema_fields
.iter()
.map(|_| {
std::sync::Arc::new(arrow::array::Int64Array::from_iter_values(0..100i64))
as std::sync::Arc<dyn arrow::array::Array>
})
.collect(),
)
.unwrap();
let mut sink = ExportSink {
max_batch_memory_bytes: Some(1), batch_memory_policy: crate::tuning::BatchMemoryPolicy::Fail,
enriched_schema: Some(schema),
..minimal_sink()
};
let result = sink.on_batch(&batch);
assert!(result.is_err(), "Fail policy should return Err");
assert!(
result.unwrap_err().to_string().contains("exceeds"),
"error should mention 'exceeds'"
);
assert_eq!(sink.oversized_batch_count, 1);
}
#[test]
fn batch_memory_bytes_returns_positive_for_non_empty_batch() {
let batch = make_int_batch(1_000, 4);
let bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
assert!(bytes >= 32_000, "got {}", bytes);
}
fn sink_with_auto_shrink(limit_bytes: usize, n_cols: usize) -> ExportSink {
use arrow::datatypes::{DataType, Field, Schema};
let schema = Arc::new(Schema::new(
(0..n_cols)
.map(|i| Field::new(format!("c{i}"), DataType::Int64, false))
.collect::<Vec<_>>(),
));
ExportSink {
max_batch_memory_bytes: Some(limit_bytes),
batch_memory_policy: crate::tuning::BatchMemoryPolicy::AutoShrink,
enriched_schema: Some(schema),
..minimal_sink()
}
}
#[test]
fn auto_shrink_preserves_row_count_exact() {
let batch = make_int_batch(100, 4);
let mut sink = sink_with_auto_shrink(1, 4);
assert!(sink.on_batch(&batch).is_ok());
assert_eq!(
sink.total_rows, 100,
"auto_shrink must not lose or duplicate rows; got {}",
sink.total_rows
);
}
#[test]
fn auto_shrink_recursive_split_large_batch() {
let batch = make_int_batch(1_000, 8); let mut sink = sink_with_auto_shrink(1, 8);
assert!(sink.on_batch(&batch).is_ok());
assert_eq!(
sink.total_rows, 1_000,
"recursive auto_shrink must preserve all rows; got {}",
sink.total_rows
);
}
#[test]
fn auto_shrink_odd_row_count_no_row_lost() {
let batch = make_int_batch(101, 4);
let mut sink = sink_with_auto_shrink(1, 4);
assert!(sink.on_batch(&batch).is_ok());
assert_eq!(
sink.total_rows, 101,
"odd-row batch must not lose the extra row; got {}",
sink.total_rows
);
}
#[test]
fn auto_shrink_single_row_over_limit_writes_and_does_not_panic() {
let batch = make_int_batch(1, 4);
let mut sink = sink_with_auto_shrink(1, 4);
let result = sink.on_batch(&batch);
assert!(
result.is_ok(),
"single-row over-limit must not fail: {:?}",
result
);
assert_eq!(
sink.total_rows, 1,
"single row must be written; got {}",
sink.total_rows
);
}
#[test]
fn auto_shrink_oversized_count_tracks_original_batches_only() {
let batch = make_int_batch(100, 4);
let mut sink = sink_with_auto_shrink(1, 4);
sink.on_batch(&batch).unwrap();
assert_eq!(
sink.oversized_batch_count, 1,
"one original batch triggered the cap; got {}",
sink.oversized_batch_count
);
}
#[test]
fn auto_shrink_batch_within_limit_writes_directly() {
let batch = make_int_batch(10, 2); let actual_bytes = crate::tuning::SourceTuning::batch_memory_bytes(&batch);
let mut sink = sink_with_auto_shrink(actual_bytes * 2, 2);
sink.on_batch(&batch).unwrap();
assert_eq!(sink.total_rows, 10, "in-limit batch: all rows written");
assert_eq!(
sink.oversized_batch_count, 0,
"in-limit batch must not increment oversized counter"
);
}
#[test]
fn row_count_min_fires_when_total_rows_is_zero() {
let mut sink = minimal_sink();
sink.quality_columns = Some(crate::config::QualityConfig {
row_count_min: Some(100),
row_count_max: None,
null_ratio_max: std::collections::HashMap::new(),
unique_columns: vec![],
unique_max_entries: None,
});
let issues = sink.run_quality_checks();
assert_eq!(issues.len(), 1, "must emit exactly one issue");
assert_eq!(issues[0].severity, crate::quality::Severity::Fail);
assert!(
issues[0].message.contains("row_count") && issues[0].message.contains("minimum"),
"message must identify row_count minimum; got: {}",
issues[0].message
);
}
#[test]
fn row_count_min_passes_when_row_count_meets_threshold() {
let mut sink = minimal_sink();
sink.quality_columns = Some(crate::config::QualityConfig {
row_count_min: Some(5),
row_count_max: None,
null_ratio_max: std::collections::HashMap::new(),
unique_columns: vec![],
unique_max_entries: None,
});
sink.total_rows = 5;
assert!(
sink.run_quality_checks().is_empty(),
"exact minimum must pass"
);
}
struct FailingWriter {
calls: usize,
fail_after: usize,
}
impl crate::format::FormatWriter for FailingWriter {
fn write_batch(&mut self, _batch: &RecordBatch) -> crate::error::Result<()> {
self.calls += 1;
if self.calls > self.fail_after {
anyhow::bail!("gremlin: injected write failure at call {}", self.calls);
}
Ok(())
}
fn finish(self: Box<Self>) -> crate::error::Result<()> {
Ok(())
}
fn bytes_written(&self) -> u64 {
0
}
}
struct SplitTriggerWriter {
write_count: usize,
}
impl crate::format::FormatWriter for SplitTriggerWriter {
fn write_batch(&mut self, _batch: &RecordBatch) -> crate::error::Result<()> {
self.write_count += 1;
Ok(())
}
fn finish(self: Box<Self>) -> crate::error::Result<()> {
Ok(())
}
fn bytes_written(&self) -> u64 {
999_999_999 }
}
#[test]
fn gremlin_writer_fail_on_second_subbatch_propagates_error() {
let batch = make_int_batch(4, 2); let mut sink = ExportSink {
max_batch_memory_bytes: Some(1), batch_memory_policy: crate::tuning::BatchMemoryPolicy::AutoShrink,
enriched_schema: Some(Arc::new(Schema::new(vec![
Field::new("c0", DataType::Int64, false),
Field::new("c1", DataType::Int64, false),
]))),
writer: Some(Box::new(FailingWriter {
calls: 0,
fail_after: 1,
})),
..minimal_sink()
};
let result = sink.on_batch(&batch);
assert!(
result.is_err(),
"writer failure inside auto_shrink must propagate as Err"
);
}
#[test]
fn gremlin_writer_fail_before_any_write_propagates_error() {
let batch = make_int_batch(2, 2);
let mut sink = ExportSink {
enriched_schema: Some(Arc::new(Schema::new(vec![
Field::new("c0", DataType::Int64, false),
Field::new("c1", DataType::Int64, false),
]))),
writer: Some(Box::new(FailingWriter {
calls: 0,
fail_after: 0,
})),
..minimal_sink()
};
let result = sink.on_batch(&batch);
assert!(
result.is_err(),
"immediate writer failure must propagate as Err"
);
}
#[test]
fn gremlin_auto_shrink_plus_file_split_total_rows_consistent() {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let mut sink = ExportSink {
max_batch_memory_bytes: Some(1),
batch_memory_policy: crate::tuning::BatchMemoryPolicy::AutoShrink,
max_file_size: Some(1),
enriched_schema: Some(schema.clone()),
writer: Some(Box::new(SplitTriggerWriter { write_count: 0 })),
..minimal_sink()
};
sink.schema = Some(schema.clone());
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int64Array::from_iter_values(0..20i64))],
)
.unwrap();
for _ in 0..3 {
sink.on_batch(&batch).unwrap();
}
let parts_rows: usize = sink.completed_parts.iter().map(|p| p.rows).sum();
assert_eq!(
parts_rows + sink.part_rows,
sink.total_rows,
"completed_parts + part_rows must equal total_rows; \
parts_rows={parts_rows}, part_rows={}, total_rows={}",
sink.part_rows,
sink.total_rows
);
assert_eq!(sink.total_rows, 60, "all 60 rows must be counted");
}
#[test]
fn gremlin_unique_cap_exact_boundary_no_false_capped_flag() {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let exact_batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5]))],
)
.unwrap();
let mut sink = sink_with_unique_cap(vec!["id".into()], 5);
sink.quality_unique_indices = vec![(0, "id".into())];
sink.track_quality(&exact_batch);
assert!(
!sink.quality_unique_capped.contains("id"),
"exactly cap distinct values must NOT set the capped flag"
);
assert_eq!(
sink.quality_unique_sets.get("id").unwrap().len(),
5,
"all 5 distinct values must be tracked"
);
let overflow_batch =
RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(vec![6]))]).unwrap();
sink.track_quality(&overflow_batch);
assert!(
sink.quality_unique_capped.contains("id"),
"cap+1 distinct values must set the capped flag"
);
}
#[test]
fn gremlin_zero_row_batch_does_not_panic_or_increment_rows() {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, false)]));
let empty =
RecordBatch::try_new(schema, vec![Arc::new(Int64Array::from(Vec::<i64>::new()))]).unwrap();
let mut sink = minimal_sink();
sink.on_batch(&empty).unwrap();
assert_eq!(
sink.total_rows, 0,
"zero-row batch must not increment total_rows"
);
}
#[test]
fn pipelined_sink_output_is_byte_identical_to_synchronous() {
use crate::pipeline::manifest_writer::compute_part_checksums;
use crate::source::BatchSink;
let batches: Vec<RecordBatch> = (0..3).map(|_| make_int_batch(100, 4)).collect();
let schema_ref = batches[0].schema();
let mut sync_sink = minimal_sink();
sync_sink.on_schema(schema_ref.clone()).unwrap();
for b in &batches {
sync_sink.on_batch(b).unwrap();
}
let sync_writer = sync_sink.writer.take().unwrap();
sync_writer.finish().unwrap();
let sync_fp = compute_part_checksums(sync_sink.tmp.path()).unwrap().0;
let sync_rows = sync_sink.total_rows;
let mut p = PipelinedSink::spawn_with_sink(minimal_sink());
p.on_schema(schema_ref.clone()).unwrap();
for b in &batches {
p.on_batch(b).unwrap();
}
let mut rec = p.finish().unwrap();
let p_writer = rec.writer.take().unwrap();
p_writer.finish().unwrap();
let p_fp = compute_part_checksums(rec.tmp.path()).unwrap().0;
assert_eq!(sync_rows, 300);
assert_eq!(rec.total_rows, sync_rows, "row count must match");
assert_eq!(
sync_fp, p_fp,
"pipelined output must be byte-identical to synchronous"
);
}
#[test]
fn roast_unique_check_treats_nulls_as_non_duplicates() {
use crate::source::BatchSink;
let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec![
Some("a"),
None,
None,
Some("b"),
]))],
)
.unwrap();
let mut sink = minimal_sink_with_quality(vec![], vec!["name".into()]);
sink.on_schema(schema).unwrap();
sink.on_batch(&batch).unwrap();
let issues = sink.run_quality_checks();
let dupe_issues: Vec<String> = issues
.iter()
.filter(|i| i.message.contains("duplicate"))
.map(|i| format!("[{:?}] {}", i.severity, i.message))
.collect();
assert!(
dupe_issues.is_empty(),
"column 'name' holds distinct non-NULL values ('a', 'b') plus 2 NULLs — \
per SQL UNIQUE semantics NULLs are not duplicates, so the quality gate \
must pass with zero duplicates; got false duplicate report(s): {:?}",
dupe_issues
);
}
#[test]
fn unique_check_all_null_column_reports_zero_issues() {
use crate::source::BatchSink;
let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec![None::<&str>, None, None]))],
)
.unwrap();
let mut sink = minimal_sink_with_quality(vec![], vec!["name".into()]);
sink.on_schema(schema).unwrap();
sink.on_batch(&batch).unwrap();
let issues = sink.run_quality_checks();
assert!(
issues.is_empty(),
"an all-NULL unique column has zero duplicates and must pass; got: {:?}",
issues
.iter()
.map(|i| i.message.as_str())
.collect::<Vec<_>>()
);
}
#[test]
fn unique_check_counts_only_real_duplicates_when_nulls_present() {
use crate::source::BatchSink;
let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec![
Some("a"),
Some("a"),
None,
Some("b"),
None,
Some("b"),
Some("c"),
]))],
)
.unwrap();
let mut sink = minimal_sink_with_quality(vec![], vec!["name".into()]);
sink.on_schema(schema).unwrap();
sink.on_batch(&batch).unwrap();
let issues = sink.run_quality_checks();
assert_eq!(
issues.len(),
1,
"real duplicates must still be reported; got: {:?}",
issues
.iter()
.map(|i| i.message.as_str())
.collect::<Vec<_>>()
);
assert_eq!(issues[0].severity, crate::quality::Severity::Fail);
assert!(
issues[0].message.contains("2 duplicate"),
"NULLs must not inflate the count — exactly 2 duplicates; got: {}",
issues[0].message
);
}
#[test]
fn unique_check_empty_string_distinct_from_null() {
use crate::source::BatchSink;
let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec![Some(""), None, Some("x")]))],
)
.unwrap();
let mut sink = minimal_sink_with_quality(vec![], vec!["name".into()]);
sink.on_schema(schema).unwrap();
sink.on_batch(&batch).unwrap();
let issues = sink.run_quality_checks();
assert!(
issues.is_empty(),
"one empty string plus one NULL are distinct, not duplicates; got: {:?}",
issues
.iter()
.map(|i| i.message.as_str())
.collect::<Vec<_>>()
);
}
#[test]
fn unique_check_duplicate_empty_strings_counted_nulls_excluded() {
use crate::source::BatchSink;
let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec![
Some(""),
Some(""),
None,
Some("x"),
]))],
)
.unwrap();
let mut sink = minimal_sink_with_quality(vec![], vec!["name".into()]);
sink.on_schema(schema).unwrap();
sink.on_batch(&batch).unwrap();
let issues = sink.run_quality_checks();
assert_eq!(
issues.len(),
1,
"the repeated empty string is one real duplicate; got: {:?}",
issues
.iter()
.map(|i| i.message.as_str())
.collect::<Vec<_>>()
);
assert!(
issues[0].message.contains("1 duplicate"),
"exactly 1 duplicate (the second \"\"), NULL excluded; got: {}",
issues[0].message
);
}
#[test]
fn unique_check_non_null_counter_accumulates_across_batches() {
use crate::source::BatchSink;
let schema = Arc::new(Schema::new(vec![Field::new("name", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(vec![Some("a"), None]))],
)
.unwrap();
let mut sink = minimal_sink_with_quality(vec![], vec!["name".into()]);
sink.on_schema(schema).unwrap();
sink.on_batch(&batch).unwrap();
sink.on_batch(&batch).unwrap();
let issues = sink.run_quality_checks();
assert_eq!(
issues.len(),
1,
"the repeated 'a' across batches is one real duplicate; got: {:?}",
issues
.iter()
.map(|i| i.message.as_str())
.collect::<Vec<_>>()
);
assert!(
issues[0].message.contains("1 duplicate"),
"exactly 1 duplicate ('a'), NULLs excluded; got: {}",
issues[0].message
);
}
#[test]
fn unique_cap_with_nulls_emits_warn_only_no_false_duplicate_fail() {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, true)]));
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int64Array::from(vec![
Some(1),
None,
Some(2),
None,
Some(3),
]))],
)
.unwrap();
let mut sink = sink_with_unique_cap(vec!["id".into()], 2);
sink.quality_unique_indices = vec![(0, "id".into())];
sink.track_quality(&batch);
sink.total_rows = 5;
assert!(
sink.quality_unique_capped.contains("id"),
"third distinct value past cap=2 must flag the column as capped"
);
let issues = sink.run_quality_checks();
assert_eq!(
issues.len(),
1,
"capped column must emit exactly the Warn issue; got: {:?}",
issues
.iter()
.map(|i| i.message.as_str())
.collect::<Vec<_>>()
);
assert_eq!(issues[0].severity, crate::quality::Severity::Warn);
assert!(
issues[0].message.contains("capped"),
"message must say 'capped'; got: {}",
issues[0].message
);
}
#[test]
fn unique_cap_exact_boundary_trailing_nulls_do_not_trip_cap() {
let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::Int64, true)]));
let batch = RecordBatch::try_new(
schema,
vec![Arc::new(Int64Array::from(vec![
Some(1),
Some(2),
Some(3),
None,
None,
]))],
)
.unwrap();
let mut sink = sink_with_unique_cap(vec!["id".into()], 3);
sink.quality_unique_indices = vec![(0, "id".into())];
sink.track_quality(&batch);
sink.total_rows = 5;
assert!(
!sink.quality_unique_capped.contains("id"),
"NULL rows after exactly cap distinct values must not trip the cap"
);
let issues = sink.run_quality_checks();
assert!(
issues.is_empty(),
"3 distinct + 2 NULLs at cap=3: no duplicates, no cap warning; got: {:?}",
issues
.iter()
.map(|i| i.message.as_str())
.collect::<Vec<_>>()
);
}