use crate::core::errors::DataProfilerError;
use bytes::Bytes;
use futures::future::BoxFuture;
use parquet::arrow::async_reader::AsyncFileReader;
use reqwest::{Client, header};
use std::ops::Range;
#[derive(Clone)]
pub struct HttpParquetReader {
client: Client,
url: String,
content_length: usize,
}
impl HttpParquetReader {
pub async fn try_new(url: &str) -> Result<Self, DataProfilerError> {
let client = Client::builder()
.connect_timeout(std::time::Duration::from_secs(10))
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| DataProfilerError::ParquetError {
message: format!("Reqwest client builder failed: {}", e),
})?;
let head_res =
client
.head(url)
.send()
.await
.map_err(|e| DataProfilerError::ParquetError {
message: format!("Failed to HEAD url: {}", e),
})?;
if !head_res.status().is_success() {
return Err(DataProfilerError::ParquetError {
message: format!("HTTP HEAD request returned status {}", head_res.status()),
});
}
let content_length = head_res
.headers()
.get(header::CONTENT_LENGTH)
.and_then(|h| h.to_str().ok())
.and_then(|h| h.parse::<usize>().ok())
.ok_or_else(|| DataProfilerError::ParquetError {
message: "Server did not provide Content-Length header".to_string(),
})?;
Ok(Self {
client,
url: url.to_string(),
content_length,
})
}
}
impl AsyncFileReader for HttpParquetReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
Box::pin(async move {
let limit = self.content_length as u64;
if range.start > limit {
return Err(parquet::errors::ParquetError::General(format!(
"Out of bounds request start {} vs limit {}",
range.start, limit
)));
}
if range.start >= range.end {
return Ok(Bytes::new());
}
let req = self
.client
.get(&self.url)
.header(
reqwest::header::RANGE,
format!("bytes={}-{}", range.start, range.end - 1),
)
.build()
.map_err(|e| {
parquet::errors::ParquetError::General(format!("invalid request: {}", e))
})?;
let res = self.client.execute(req).await.map_err(|e| {
parquet::errors::ParquetError::General(format!("network err: {}", e))
})?;
if res.status() != reqwest::StatusCode::PARTIAL_CONTENT {
if res.status() == reqwest::StatusCode::OK {
return Err(parquet::errors::ParquetError::General(
"Server ignored Range header and returned 200 OK. Aborting to prevent full file download.".to_string(),
));
}
return Err(parquet::errors::ParquetError::General(format!(
"Expected 206 Partial Content, got {}",
res.status()
)));
}
let bytes = res.bytes().await.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"failed to fetch byte body from http stream: {}",
e
))
})?;
Ok(bytes)
})
}
fn get_metadata<'a>(
&'a mut self,
options: Option<&'a parquet::arrow::arrow_reader::ArrowReaderOptions>,
) -> BoxFuture<
'a,
parquet::errors::Result<std::sync::Arc<parquet::file::metadata::ParquetMetaData>>,
> {
use futures::FutureExt;
use parquet::file::metadata::{PageIndexPolicy, ParquetMetaDataReader};
async move {
let limit = self.content_length as u64;
let metadata_opts = options.map(|o| o.metadata_options().clone());
let metadata_reader = ParquetMetaDataReader::new()
.with_page_index_policy(PageIndexPolicy::from(
options.is_some_and(|o| o.page_index()),
))
.with_metadata_options(metadata_opts);
let parquet_metadata = metadata_reader.load_and_finish(self, limit).await?;
Ok(std::sync::Arc::new(parquet_metadata))
}
.boxed()
}
}
use crate::core::report_assembler::ReportAssembler;
use crate::engines::columnar::record_batch_analyzer::RecordBatchAnalyzer;
use crate::parsers::parquet::ParquetConfig;
use crate::types::{
DataSource, ExecutionMetadata, FileFormat, ParquetMetadata, ProfileReport, QualityDimension,
};
use futures::StreamExt;
use parquet::arrow::async_reader::ParquetRecordBatchStreamBuilder;
pub async fn analyze_parquet_async_http(
url: &str,
config: &ParquetConfig,
) -> Result<ProfileReport, DataProfilerError> {
analyze_parquet_async_http_dims(url, config, None).await
}
pub async fn analyze_parquet_async_http_dims(
url: &str,
config: &ParquetConfig,
quality_dimensions: Option<Vec<QualityDimension>>,
) -> Result<ProfileReport, DataProfilerError> {
let start = std::time::Instant::now();
let reader = HttpParquetReader::try_new(url).await?;
let content_length = reader.content_length as u64;
let builder = ParquetRecordBatchStreamBuilder::new(reader)
.await
.map_err(|e| DataProfilerError::ParquetError {
message: format!("Failed to create Parquet stream builder: {}", e),
})?;
let parquet_meta = builder.metadata().clone();
let file_metadata = parquet_meta.file_metadata();
let num_row_groups = parquet_meta.num_row_groups();
let version = file_metadata.version();
let compression = if num_row_groups > 0 && parquet_meta.row_group(0).num_columns() > 0 {
format!("{:?}", parquet_meta.row_group(0).column(0).compression())
} else {
"UNKNOWN".to_string()
};
let compressed_size_bytes: u64 = (0..num_row_groups)
.map(|i| parquet_meta.row_group(i).compressed_size() as u64)
.sum();
let schema_summary = format!("{}", builder.schema());
let mut stream = builder
.with_batch_size(config.batch_size)
.build()
.map_err(|e| DataProfilerError::ParquetError {
message: format!("Failed to build Parquet stream: {}", e),
})?;
let mut analyzer = RecordBatchAnalyzer::new();
while let Some(batch_result) = stream.next().await {
let batch = batch_result.map_err(|e| DataProfilerError::ParquetError {
message: format!("Failed to read Parquet async batch: {}", e),
})?;
analyzer.process_batch(&batch)?;
}
let column_profiles = analyzer.to_profiles(false, false, None);
let total_rows = analyzer.total_rows();
let sample_columns = analyzer.create_sample_columns();
let scan_time_ms = start.elapsed().as_millis();
let parquet_metadata = Some(ParquetMetadata {
num_row_groups,
compression,
version,
schema_summary,
compressed_size_bytes,
uncompressed_size_bytes: None,
});
let num_columns = column_profiles.len();
let mut assembler = ReportAssembler::new(
DataSource::File {
path: url.to_string(),
format: FileFormat::Parquet,
size_bytes: content_length,
modified_at: None,
parquet_metadata,
},
ExecutionMetadata::new(total_rows, num_columns, scan_time_ms),
)
.columns(column_profiles)
.with_quality_data(sample_columns);
if let Some(dims) = quality_dimensions {
assembler = assembler.with_requested_dimensions(dims);
}
Ok(assembler.build())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::Arc;
use std::thread::JoinHandle;
use std::time::Duration;
struct MockServer {
url: String,
handle: JoinHandle<()>,
}
impl MockServer {
fn url(&self) -> &str {
&self.url
}
fn join(self) {
self.handle.join().unwrap();
}
}
fn read_http_request(stream: &mut TcpStream) -> std::io::Result<String> {
stream.set_read_timeout(Some(Duration::from_secs(2)))?;
let mut request = Vec::new();
let mut buffer = [0; 1024];
loop {
let bytes_read = stream.read(&mut buffer)?;
if bytes_read == 0 {
break;
}
request.extend_from_slice(&buffer[..bytes_read]);
if request.windows(4).any(|window| window == b"\r\n\r\n") {
break;
}
if request.len() > 16 * 1024 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"request headers exceeded test limit",
));
}
}
Ok(String::from_utf8_lossy(&request).into_owned())
}
fn parse_range_header(request: &str) -> Option<(usize, usize)> {
let range_line = request
.lines()
.find(|line| line.to_ascii_lowercase().starts_with("range: bytes="))?;
let range_value = range_line.split_once(':')?.1.trim();
let byte_range = range_value.strip_prefix("bytes=")?;
let (start, end) = byte_range.split_once('-')?;
Some((start.parse().ok()?, end.parse().ok()?))
}
fn spawn_mock_server(data: Vec<u8>) -> MockServer {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let port = listener.local_addr().unwrap().port();
let data = Arc::new(data);
let handle = std::thread::spawn(move || {
for _ in 0..2 {
let (mut stream, _) = listener.accept().expect("mock server accept failed");
let request = read_http_request(&mut stream).expect("mock server read failed");
if request.starts_with("HEAD") {
let response = format!(
"HTTP/1.1 200 OK\r\nContent-Length: {}\r\nAccept-Ranges: bytes\r\nConnection: close\r\n\r\n",
data.len()
);
stream
.write_all(response.as_bytes())
.expect("mock server HEAD response failed");
} else if request.starts_with("GET") {
let (start, end) = parse_range_header(&request)
.expect("mock server missing or invalid Range header");
let chunk = data
.get(start..=end)
.expect("mock server requested invalid byte range");
let response = format!(
"HTTP/1.1 206 Partial Content\r\nContent-Range: bytes {}-{}/{}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n",
start,
end,
data.len(),
chunk.len()
);
stream
.write_all(response.as_bytes())
.expect("mock server GET response headers failed");
stream
.write_all(chunk)
.expect("mock server GET response body failed");
} else {
panic!("mock server received unexpected request: {request}");
}
}
});
MockServer {
url: format!("http://127.0.0.1:{}", port),
handle,
}
}
#[tokio::test]
async fn test_http_parquet_reader_byte_ranges() {
let dummy_data: Vec<u8> = (0..255).map(|x| x as u8).collect();
let server = spawn_mock_server(dummy_data.clone());
let mut reader = HttpParquetReader::try_new(server.url()).await.unwrap();
assert_eq!(reader.content_length, 255);
let bytes = reader.get_bytes(10..20).await.unwrap();
assert_eq!(bytes.len(), 10);
assert_eq!(&bytes[..], &dummy_data[10..20]);
server.join();
}
}