use std::io::Read;
use std::time::Duration;
use tokio::sync::mpsc;
use crate::core::errors::DataProfilerError;
use crate::core::profile_builder;
use crate::core::progress::{ProgressSink, ProgressTracker};
use crate::core::report_assembler::ReportAssembler;
use crate::core::sampling::{ChunkSize, SamplingStrategy};
use crate::core::stop_condition::{SchemaStabilityTracker, StopCondition, StopEvaluator};
use crate::core::streaming_stats::StreamingColumnCollection;
use crate::types::{
DataSource, ExecutionMetadata, FileFormat, MetricPack, ProfileReport, QualityDimension,
StreamSourceSystem,
};
use super::async_source::AsyncDataSource;
struct ParsedChunk {
records: Vec<Vec<String>>,
bytes_read: u64,
}
pub struct AsyncStreamingProfiler {
chunk_size: ChunkSize,
sampling_strategy: SamplingStrategy,
memory_limit_mb: usize,
channel_capacity: usize,
progress_sink: ProgressSink,
progress_interval: Duration,
stop_condition: StopCondition,
quality_dimensions: Option<Vec<QualityDimension>>,
metric_packs: Option<Vec<MetricPack>>,
}
impl AsyncStreamingProfiler {
pub fn new() -> Self {
Self {
chunk_size: ChunkSize::default(),
sampling_strategy: SamplingStrategy::None,
memory_limit_mb: 256,
channel_capacity: 4,
progress_sink: ProgressSink::None,
progress_interval: Duration::from_millis(500),
stop_condition: StopCondition::Never,
quality_dimensions: None,
metric_packs: None,
}
}
pub fn chunk_size(mut self, chunk_size: ChunkSize) -> Self {
self.chunk_size = chunk_size;
self
}
pub fn sampling(mut self, strategy: SamplingStrategy) -> Self {
self.sampling_strategy = strategy;
self
}
pub fn memory_limit_mb(mut self, limit: usize) -> Self {
self.memory_limit_mb = limit;
self
}
pub fn channel_capacity(mut self, capacity: usize) -> Self {
self.channel_capacity = capacity.max(1);
self
}
pub fn progress(mut self, sink: ProgressSink, interval: Duration) -> Self {
self.progress_sink = sink;
self.progress_interval = interval;
self
}
pub fn stop_condition(mut self, condition: StopCondition) -> Self {
self.stop_condition = condition;
self
}
pub fn quality_dimensions(mut self, dims: Vec<QualityDimension>) -> Self {
self.quality_dimensions = Some(dims);
self
}
pub fn metric_packs(mut self, packs: Vec<MetricPack>) -> Self {
self.metric_packs = Some(packs);
self
}
pub async fn analyze_stream(
&self,
source: impl AsyncDataSource,
) -> Result<ProfileReport, DataProfilerError> {
let source_info = source.source_info();
let format = source_info.format.clone();
match format {
FileFormat::Csv | FileFormat::Json | FileFormat::Jsonl => {}
_ => {
return Err(DataProfilerError::StreamingError {
message: format!(
"AsyncStreamingProfiler does not support {:?} format",
format
),
});
}
}
let start = std::time::Instant::now();
let reader = source.into_async_read().await?;
let rows_per_chunk = self.rows_per_chunk();
let (tx, rx) = mpsc::channel::<ParsedChunk>(self.channel_capacity);
let sync_reader = tokio_util::io::SyncIoBridge::new(reader);
let reader_handle = tokio::task::spawn_blocking(move || match format {
FileFormat::Csv => Self::reader_task(sync_reader, tx, rows_per_chunk),
FileFormat::Json | FileFormat::Jsonl => {
Self::json_reader_task(sync_reader, tx, rows_per_chunk, format)
}
_ => unreachable!(),
});
let process_result = self.process_chunks(rx, source_info.size_hint).await;
let (_headers, column_stats, total_rows, sampled_rows, total_bytes, truncation_reason) =
match process_result {
Ok(result) => result,
Err(e) => {
reader_handle.abort();
return Err(e);
}
};
match reader_handle.await {
Ok(Ok(())) => {}
Ok(Err(e)) => return Err(e),
Err(join_err) if join_err.is_cancelled() => {
}
Err(join_err) => {
return Err(DataProfilerError::StreamingError {
message: format!("Reader task panicked: {}", join_err),
});
}
}
let packs = self.metric_packs.as_deref();
let skip_stats = !MetricPack::include_statistics(packs);
let skip_patterns = !MetricPack::include_patterns(packs);
let column_profiles = profile_builder::profiles_from_streaming(
&column_stats,
skip_stats,
skip_patterns,
None,
);
let sample_columns = profile_builder::quality_check_samples(&column_stats);
let scan_time_ms = start.elapsed().as_millis();
let num_columns = column_profiles.len();
let data_source = DataSource::Stream {
topic: source_info.label,
batch_id: uuid::Uuid::new_v4().to_string(),
partition: None,
consumer_group: None,
source_system: source_info
.source_system
.unwrap_or(StreamSourceSystem::Http),
session_id: None,
first_record_at: None,
last_record_at: None,
};
let mut execution = ExecutionMetadata::new(sampled_rows, num_columns, scan_time_ms)
.with_bytes_consumed(total_bytes);
if let Some(reason) = truncation_reason {
execution = execution.with_truncation(reason);
} else if total_rows > 0 && sampled_rows < total_rows {
let ratio = sampled_rows as f64 / total_rows as f64;
execution = execution.with_sampling(ratio).with_source_exhausted(false);
}
let mut assembler = ReportAssembler::new(data_source, execution)
.columns(column_profiles)
.with_quality_data(sample_columns);
if let Some(ref dims) = self.quality_dimensions {
assembler = assembler.with_requested_dimensions(dims.clone());
}
Ok(assembler.build())
}
fn reader_task(
sync_reader: tokio_util::io::SyncIoBridge<
std::pin::Pin<Box<dyn tokio::io::AsyncRead + Send + Unpin>>,
>,
tx: mpsc::Sender<ParsedChunk>,
rows_per_chunk: usize,
) -> Result<(), DataProfilerError> {
let mut csv_reader = csv::ReaderBuilder::new()
.has_headers(true)
.flexible(true)
.from_reader(sync_reader);
let headers = csv_reader
.headers()
.map_err(|e| DataProfilerError::CsvParsingError {
message: e.to_string(),
suggestion: "Check CSV formatting in the stream data".to_string(),
})?;
let header_fields: Vec<String> = headers.iter().map(|f| f.to_string()).collect();
let header_chunk = ParsedChunk {
records: vec![header_fields],
bytes_read: 0,
};
if tx.blocking_send(header_chunk).is_err() {
return Ok(());
}
let mut current_chunk: Vec<Vec<String>> = Vec::with_capacity(rows_per_chunk);
let mut bytes_in_chunk: u64 = 0;
for result in csv_reader.records() {
let record = result.map_err(|e| DataProfilerError::CsvParsingError {
message: e.to_string(),
suggestion: "Check CSV formatting in the stream data".to_string(),
})?;
bytes_in_chunk += record.as_slice().len() as u64 + 1;
let fields: Vec<String> = record.iter().map(|f| f.to_string()).collect();
current_chunk.push(fields);
if current_chunk.len() >= rows_per_chunk {
let chunk = ParsedChunk {
records: std::mem::replace(
&mut current_chunk,
Vec::with_capacity(rows_per_chunk),
),
bytes_read: bytes_in_chunk,
};
bytes_in_chunk = 0;
if tx.blocking_send(chunk).is_err() {
return Ok(());
}
}
}
if !current_chunk.is_empty() {
let chunk = ParsedChunk {
records: current_chunk,
bytes_read: bytes_in_chunk,
};
let _ = tx.blocking_send(chunk);
}
Ok(())
}
fn json_reader_task(
sync_reader: tokio_util::io::SyncIoBridge<
std::pin::Pin<Box<dyn tokio::io::AsyncRead + Send + Unpin>>,
>,
tx: mpsc::Sender<ParsedChunk>,
rows_per_chunk: usize,
format: FileFormat,
) -> Result<(), DataProfilerError> {
use serde_json::Value;
use std::io::BufRead;
let mut buf_reader = std::io::BufReader::new(sync_reader);
let mut known_columns: Vec<String> = Vec::new();
let mut current_chunk: Vec<Vec<String>> = Vec::with_capacity(rows_per_chunk);
let mut known_columns_set: std::collections::HashSet<String> =
std::collections::HashSet::new();
let mut bytes_in_chunk: u64 = 0;
let mut headers_sent = false;
let process_object = |obj: &serde_json::Map<String, Value>,
known_cols: &mut Vec<String>,
known_cols_set: &mut std::collections::HashSet<String>,
is_headers_sent: bool|
-> Vec<String> {
if !is_headers_sent {
for key in obj.keys() {
if known_cols_set.insert(key.clone()) {
known_cols.push(key.clone());
}
}
}
known_cols
.iter()
.map(|col| {
obj.get(col)
.map(|v| match v {
Value::Null => String::new(),
Value::Bool(b) => b.to_string(),
Value::Number(n) => n.to_string(),
Value::String(s) => s.to_string(),
_ => serde_json::to_string(v).unwrap_or_default(),
})
.unwrap_or_default()
})
.collect()
};
let send_chunk = |chunk: &mut Vec<Vec<String>>,
bytes: &mut u64,
cols: &[String],
headers_sent: &mut bool,
tx: &mpsc::Sender<ParsedChunk>|
-> Result<bool, DataProfilerError> {
if !*headers_sent && !cols.is_empty() {
let header_chunk = ParsedChunk {
records: vec![cols.to_vec()],
bytes_read: 0,
};
if tx.blocking_send(header_chunk).is_err() {
return Ok(false); }
*headers_sent = true;
}
if !chunk.is_empty() {
let data_chunk = ParsedChunk {
records: std::mem::replace(chunk, Vec::with_capacity(rows_per_chunk)),
bytes_read: *bytes,
};
*bytes = 0;
if tx.blocking_send(data_chunk).is_err() {
return Ok(false);
}
}
Ok(true)
};
match format {
FileFormat::Jsonl => {
let mut reader = buf_reader.take(u64::MAX); let de = serde_json::Deserializer::from_reader(&mut reader);
for value in de.into_iter::<Value>() {
let v = value.map_err(|e| DataProfilerError::JsonParsingError {
message: e.to_string(),
})?;
if let Value::Object(obj) = v {
let row = process_object(
&obj,
&mut known_columns,
&mut known_columns_set,
headers_sent,
);
bytes_in_chunk += row.iter().map(|s| s.len() as u64 + 4).sum::<u64>();
current_chunk.push(row);
if current_chunk.len() >= rows_per_chunk
&& !send_chunk(
&mut current_chunk,
&mut bytes_in_chunk,
&known_columns,
&mut headers_sent,
&tx,
)?
{
return Ok(());
}
}
}
}
_ => {
let mut found_array = false;
loop {
let mut consume = 0;
{
let buf = buf_reader.fill_buf().map_err(DataProfilerError::from)?;
if buf.is_empty() {
break;
}
for &b in buf {
consume += 1;
if b == b'[' {
found_array = true;
break;
} else if !b.is_ascii_whitespace() {
break;
}
}
}
buf_reader.consume(consume);
bytes_in_chunk += consume as u64;
if found_array || consume == 0 {
break;
}
}
if found_array {
loop {
let mut consume = 0;
let mut found_value = false;
let mut end_of_array = false;
{
let buf = buf_reader.fill_buf().map_err(DataProfilerError::from)?;
if buf.is_empty() {
break;
}
for &b in buf {
if b.is_ascii_whitespace() || b == b',' {
consume += 1;
} else if b == b']' {
end_of_array = true;
consume += 1;
break;
} else {
found_value = true;
break;
}
}
}
buf_reader.consume(consume);
bytes_in_chunk += consume as u64;
if end_of_array {
break;
}
if found_value {
let mut de = serde_json::Deserializer::from_reader(&mut buf_reader);
match serde::Deserialize::deserialize(&mut de) {
Ok(Value::Object(obj)) => {
let row = process_object(
&obj,
&mut known_columns,
&mut known_columns_set,
headers_sent,
);
bytes_in_chunk +=
row.iter().map(|s| s.len() as u64 + 4).sum::<u64>();
current_chunk.push(row);
if current_chunk.len() >= rows_per_chunk
&& !send_chunk(
&mut current_chunk,
&mut bytes_in_chunk,
&known_columns,
&mut headers_sent,
&tx,
)?
{
return Ok(());
}
}
Ok(_) => {
bytes_in_chunk += 10;
}
Err(_) => {
break;
}
}
}
}
}
}
}
if !current_chunk.is_empty() || !headers_sent {
let _ = send_chunk(
&mut current_chunk,
&mut bytes_in_chunk,
&known_columns,
&mut headers_sent,
&tx,
);
}
Ok(())
}
async fn process_chunks(
&self,
mut rx: mpsc::Receiver<ParsedChunk>,
size_hint: Option<u64>,
) -> Result<
(
Vec<String>,
StreamingColumnCollection,
usize,
usize,
u64,
Option<crate::types::TruncationReason>,
),
DataProfilerError,
> {
let mut column_stats = StreamingColumnCollection::memory_limit(self.memory_limit_mb);
let mut progress_tracker =
ProgressTracker::new(self.progress_sink.clone(), self.progress_interval);
let mut total_rows: usize = 0;
let mut sampled_rows: usize = 0;
let mut total_bytes: u64 = 0;
let estimated_total = size_hint.map(|total| total / 50); let mut stop_eval = StopEvaluator::new(self.stop_condition.clone());
if let Some(est) = estimated_total {
stop_eval = stop_eval.with_estimated_total(est);
}
let mut schema_tracker = SchemaStabilityTracker::from_condition(&self.stop_condition);
let mut truncation_reason: Option<crate::types::TruncationReason> = None;
let header_chunk = rx
.recv()
.await
.ok_or_else(|| DataProfilerError::StreamingError {
message: "Stream ended before any data was received".to_string(),
})?;
total_bytes += header_chunk.bytes_read;
if header_chunk.records.is_empty() {
return Err(DataProfilerError::StreamingError {
message: "Stream header chunk was empty".to_string(),
});
}
let headers: Vec<String> = header_chunk.records.into_iter().next().unwrap_or_default();
if headers.is_empty() {
return Err(DataProfilerError::StreamingError {
message: "No column headers found in stream".to_string(),
});
}
let estimated_total_rows = size_hint.map(|total| {
(total as usize) / 50 });
progress_tracker.emit_started(estimated_total_rows, size_hint);
progress_tracker.emit_schema(headers.clone());
while let Some(chunk) = rx.recv().await {
total_bytes += chunk.bytes_read;
let chunk_rows = chunk.records.len();
let chunk_bytes = chunk.bytes_read;
for (row_idx, values) in chunk.records.into_iter().enumerate() {
let global_row_idx = total_rows + row_idx;
if !self
.sampling_strategy
.should_include(global_row_idx, global_row_idx + 1)
{
continue;
}
column_stats.process_record(&headers, values);
sampled_rows += 1;
}
total_rows += chunk_rows;
if column_stats.is_memory_pressure() {
column_stats.reduce_memory_usage();
}
let memory_fraction = if self.memory_limit_mb > 0 {
column_stats.memory_usage_bytes() as f64
/ (self.memory_limit_mb * 1024 * 1024) as f64
} else {
0.0
};
if stop_eval.update(chunk_rows as u64, chunk_bytes, memory_fraction) {
truncation_reason = stop_eval.truncation_reason();
drop(rx); break;
}
if let Some(ref mut tracker) = schema_tracker {
let fingerprint = column_stats.column_type_fingerprint();
if tracker.update(fingerprint, chunk_rows as u64) {
truncation_reason = Some(tracker.truncation_reason());
drop(rx);
break;
}
}
progress_tracker.emit_chunk(chunk_rows, chunk_bytes, estimated_total_rows);
}
progress_tracker.emit_finished(truncation_reason.is_some());
Ok((
headers,
column_stats,
total_rows,
sampled_rows,
total_bytes,
truncation_reason,
))
}
fn rows_per_chunk(&self) -> usize {
match self.chunk_size {
ChunkSize::Adaptive => 8192,
ChunkSize::Fixed(bytes) => {
(bytes / 50).max(100)
}
ChunkSize::Custom(f) => {
f(0).max(100)
}
}
}
}
impl Default for AsyncStreamingProfiler {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::engines::streaming::async_source::{AsyncSourceInfo, BytesSource};
use crate::types::DataType;
fn csv_source(data: &'static [u8]) -> BytesSource {
BytesSource::new(
bytes::Bytes::from_static(data),
AsyncSourceInfo {
label: "test".into(),
format: FileFormat::Csv,
size_hint: Some(data.len() as u64),
source_system: None,
has_header: None,
},
)
}
#[tokio::test]
async fn test_basic_csv_profiling() {
let source = csv_source(b"name,age,salary\nAlice,30,50000\nBob,25,60000\nCarol,35,55000\n");
let profiler = AsyncStreamingProfiler::new();
let report = profiler.analyze_stream(source).await.unwrap();
assert_eq!(report.column_profiles.len(), 3);
assert_eq!(report.execution.columns_detected, 3);
let age_col = report
.column_profiles
.iter()
.find(|p| p.name == "age")
.expect("age column");
assert_eq!(age_col.data_type, DataType::Integer);
assert_eq!(age_col.total_count, 3);
assert_eq!(age_col.null_count, 0);
}
#[tokio::test]
async fn test_empty_input() {
let source = csv_source(b"");
let profiler = AsyncStreamingProfiler::new();
let result = profiler.analyze_stream(source).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_headers_only() {
let source = csv_source(b"name,age,salary\n");
let profiler = AsyncStreamingProfiler::new();
let report = profiler.analyze_stream(source).await.unwrap();
assert_eq!(report.column_profiles.len(), 0);
assert_eq!(report.execution.rows_processed, 0);
}
#[tokio::test]
async fn test_quoted_newlines_rfc4180() {
let source =
csv_source(b"name,bio,age\nAlice,\"likes\ncats\",30\nBob,\"no\nnewlines\njk\",25\n");
let profiler = AsyncStreamingProfiler::new();
let report = profiler.analyze_stream(source).await.unwrap();
assert_eq!(report.column_profiles.len(), 3);
assert_eq!(report.execution.rows_processed, 2);
let bio_col = report
.column_profiles
.iter()
.find(|p| p.name == "bio")
.expect("bio column");
assert_eq!(bio_col.total_count, 2);
}
#[tokio::test]
async fn test_large_synthetic_stream() {
let mut data = String::from("id,value,label\n");
for i in 0..10_000 {
data.push_str(&format!("{},{},item_{}\n", i, i * 10 + 5, i));
}
let source = BytesSource::new(
bytes::Bytes::from(data),
AsyncSourceInfo {
label: "large-test".into(),
format: FileFormat::Csv,
size_hint: None,
source_system: None,
has_header: None,
},
);
let profiler = AsyncStreamingProfiler::new().memory_limit_mb(16);
let report = profiler.analyze_stream(source).await.unwrap();
assert_eq!(report.column_profiles.len(), 3);
let id_col = report
.column_profiles
.iter()
.find(|p| p.name == "id")
.expect("id column");
assert_eq!(id_col.data_type, DataType::Integer);
}
#[tokio::test]
async fn test_channel_capacity_one() {
let source =
csv_source(b"a,b\n1,2\n3,4\n5,6\n7,8\n9,10\n11,12\n13,14\n15,16\n17,18\n19,20\n");
let profiler = AsyncStreamingProfiler::new().channel_capacity(1);
let report = profiler.analyze_stream(source).await.unwrap();
assert_eq!(report.column_profiles.len(), 2);
}
#[tokio::test]
async fn test_progress_events_fire() {
use crate::core::progress::{ProgressEvent, ProgressSink};
use std::sync::Arc;
let progress_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let count_clone = progress_count.clone();
let mut data = String::from("x,y\n");
for i in 0..1000 {
data.push_str(&format!("{},{}\n", i, i * 2));
}
let source = BytesSource::new(
bytes::Bytes::from(data),
AsyncSourceInfo {
label: "progress-test".into(),
format: FileFormat::Csv,
size_hint: None,
source_system: None,
has_header: None,
},
);
let sink = ProgressSink::Callback(Arc::new(move |_event: ProgressEvent| {
count_clone.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}));
let profiler =
AsyncStreamingProfiler::new().progress(sink, std::time::Duration::from_millis(0));
let _report = profiler.analyze_stream(source).await.unwrap();
assert!(progress_count.load(std::sync::atomic::Ordering::Relaxed) >= 2);
}
#[tokio::test]
async fn test_unsupported_format_rejected() {
let source = BytesSource::new(
bytes::Bytes::from_static(b"{}"),
AsyncSourceInfo {
label: "json-test".into(),
format: FileFormat::Json,
size_hint: None,
source_system: None,
has_header: None,
},
);
let profiler = AsyncStreamingProfiler::new();
let result = profiler.analyze_stream(source).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_early_stop_max_rows() {
let mut data = String::from("id,value\n");
for i in 0..10_000 {
data.push_str(&format!("{},val_{}\n", i, i));
}
let source = BytesSource::new(
bytes::Bytes::from(data),
AsyncSourceInfo {
label: "stop-test".into(),
format: FileFormat::Csv,
size_hint: None,
source_system: None,
has_header: None,
},
);
let profiler = AsyncStreamingProfiler::new().stop_condition(StopCondition::MaxRows(100));
let report = profiler.analyze_stream(source).await.unwrap();
assert!(
report.execution.rows_processed < 10_000,
"Should stop before processing all rows, got {}",
report.execution.rows_processed
);
assert!(!report.execution.source_exhausted);
assert!(matches!(
report.execution.truncation_reason,
Some(crate::types::TruncationReason::MaxRows(100))
));
}
}