use std::pin::Pin;
use tokio::io::AsyncRead;
use crate::core::errors::DataProfilerError;
use crate::types::{FileFormat, StreamSourceSystem};
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct AsyncSourceInfo {
pub label: String,
pub format: FileFormat,
pub size_hint: Option<u64>,
pub source_system: Option<StreamSourceSystem>,
pub has_header: Option<bool>,
}
impl AsyncSourceInfo {
pub fn new(label: impl Into<String>, format: FileFormat) -> Self {
Self {
label: label.into(),
format,
size_hint: None,
source_system: None,
has_header: None,
}
}
pub fn size_hint(mut self, size: Option<u64>) -> Self {
self.size_hint = size;
self
}
pub fn source_system(mut self, system: StreamSourceSystem) -> Self {
self.source_system = Some(system);
self
}
pub fn has_header(mut self, has: bool) -> Self {
self.has_header = Some(has);
self
}
}
impl Default for AsyncSourceInfo {
fn default() -> Self {
Self {
label: String::new(),
format: FileFormat::Unknown(String::new()),
size_hint: None,
source_system: None,
has_header: None,
}
}
}
#[async_trait::async_trait]
pub trait AsyncDataSource: Send {
async fn into_async_read(
self,
) -> Result<Pin<Box<dyn AsyncRead + Send + Unpin>>, DataProfilerError>;
fn source_info(&self) -> AsyncSourceInfo;
}
#[derive(Debug, Clone)]
pub struct BytesSource {
data: bytes::Bytes,
info: AsyncSourceInfo,
}
impl BytesSource {
pub fn new(data: bytes::Bytes, info: AsyncSourceInfo) -> Self {
Self { data, info }
}
}
#[async_trait::async_trait]
impl AsyncDataSource for BytesSource {
async fn into_async_read(
self,
) -> Result<Pin<Box<dyn AsyncRead + Send + Unpin>>, DataProfilerError> {
let cursor = std::io::Cursor::new(self.data);
Ok(Box::pin(cursor))
}
fn source_info(&self) -> AsyncSourceInfo {
self.info.clone()
}
}
#[async_trait::async_trait]
impl AsyncDataSource for (tokio::fs::File, AsyncSourceInfo) {
async fn into_async_read(
self,
) -> Result<Pin<Box<dyn AsyncRead + Send + Unpin>>, DataProfilerError> {
Ok(Box::pin(self.0))
}
fn source_info(&self) -> AsyncSourceInfo {
self.1.clone()
}
}
#[cfg(feature = "parquet-async")]
pub struct ReqwestSource {
response: reqwest::Response,
info: AsyncSourceInfo,
}
#[cfg(feature = "parquet-async")]
impl ReqwestSource {
pub fn new(response: reqwest::Response, info: AsyncSourceInfo) -> Self {
Self { response, info }
}
}
#[cfg(feature = "parquet-async")]
#[async_trait::async_trait]
impl AsyncDataSource for ReqwestSource {
async fn into_async_read(
self,
) -> Result<Pin<Box<dyn AsyncRead + Send + Unpin>>, DataProfilerError> {
use futures::TryStreamExt;
use tokio_util::io::StreamReader;
let byte_stream = self.response.bytes_stream().map_err(std::io::Error::other);
Ok(Box::pin(StreamReader::new(byte_stream)))
}
fn source_info(&self) -> AsyncSourceInfo {
self.info.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::io::AsyncReadExt;
#[tokio::test]
async fn test_bytes_source_roundtrip() {
let csv_data = b"name,age\nAlice,30\nBob,25\n";
let source = BytesSource::new(
bytes::Bytes::from_static(csv_data),
AsyncSourceInfo {
label: "test-buffer".into(),
format: FileFormat::Csv,
size_hint: Some(csv_data.len() as u64),
source_system: None,
has_header: None,
},
);
let info = source.source_info();
assert_eq!(info.label, "test-buffer");
assert_eq!(info.size_hint, Some(csv_data.len() as u64));
let mut reader = source.into_async_read().await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
assert_eq!(buf, "name,age\nAlice,30\nBob,25\n");
}
#[tokio::test]
async fn test_file_source() {
use std::io::Write;
let mut tmp = tempfile::NamedTempFile::new().unwrap();
writeln!(tmp, "x,y").unwrap();
writeln!(tmp, "1,2").unwrap();
tmp.flush().unwrap();
let file = tokio::fs::File::open(tmp.path()).await.unwrap();
let info = AsyncSourceInfo {
label: tmp.path().display().to_string(),
format: FileFormat::Csv,
size_hint: Some(std::fs::metadata(tmp.path()).unwrap().len()),
source_system: None,
has_header: None,
};
let source = (file, info);
let mut reader = source.into_async_read().await.unwrap();
let mut buf = String::new();
reader.read_to_string(&mut buf).await.unwrap();
assert!(buf.contains("x,y"));
assert!(buf.contains("1,2"));
}
}