use crate::dataframe::DataFrame;
use crate::series::Series;
use crate::types::DataType;
use crate::VeloxxError;
use std::path::Path;
pub struct ParquetReader {
#[cfg(not(feature = "advanced_io"))]
_phantom: std::marker::PhantomData<()>,
}
impl ParquetReader {
pub fn new() -> Self {
Self {
#[cfg(not(feature = "advanced_io"))]
_phantom: std::marker::PhantomData,
}
}
#[cfg(feature = "advanced_io")]
pub async fn read_dataframe<P: AsRef<Path>>(&self, path: P) -> Result<DataFrame, VeloxxError> {
let path_buf = path.as_ref().to_path_buf();
let path_str = path_buf.to_string_lossy().to_string();
tokio::task::spawn_blocking(move || crate::io::arrow::read_parquet_to_dataframe(&path_str))
.await
.map_err(|e| VeloxxError::InvalidOperation(format!("Task join error: {}", e)))?
}
#[cfg(not(feature = "advanced_io"))]
pub async fn read_dataframe<P: AsRef<Path>>(&self, _path: P) -> Result<DataFrame, VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
#[cfg(feature = "advanced_io")]
pub async fn read_dataframe_streaming<P: AsRef<Path>>(
&self,
path: P,
_batch_size: usize,
) -> Result<Vec<DataFrame>, VeloxxError> {
let df = self.read_dataframe(path).await?;
Ok(vec![df])
}
#[cfg(not(feature = "advanced_io"))]
pub async fn read_dataframe_streaming<P: AsRef<Path>>(
&self,
_path: P,
_batch_size: usize,
) -> Result<Vec<DataFrame>, VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
}
impl Default for ParquetReader {
fn default() -> Self {
Self::new()
}
}
pub struct ParquetWriter {
#[cfg(not(feature = "advanced_io"))]
_phantom: std::marker::PhantomData<()>,
}
impl ParquetWriter {
pub fn new() -> Self {
Self {
#[cfg(not(feature = "advanced_io"))]
_phantom: std::marker::PhantomData,
}
}
#[cfg(feature = "advanced_io")]
pub async fn write_dataframe<P: AsRef<Path>>(
&self,
dataframe: &DataFrame,
path: P,
) -> Result<(), VeloxxError> {
let path_buf = path.as_ref().to_path_buf();
let path_str = path_buf.to_string_lossy().to_string();
let df_clone = dataframe.clone();
tokio::task::spawn_blocking(move || {
crate::io::arrow::write_parquet_from_dataframe(&df_clone, &path_str)
})
.await
.map_err(|e| VeloxxError::InvalidOperation(format!("Task join error: {}", e)))?
}
#[cfg(not(feature = "advanced_io"))]
pub async fn write_dataframe<P: AsRef<Path>>(
&self,
_dataframe: &DataFrame,
_path: P,
) -> Result<(), VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
#[cfg(feature = "advanced_io")]
pub async fn write_dataframe_compressed<P: AsRef<Path>>(
&self,
dataframe: &DataFrame,
path: P,
_compression: CompressionType,
) -> Result<(), VeloxxError> {
self.write_dataframe(dataframe, path).await
}
#[cfg(not(feature = "advanced_io"))]
pub async fn write_dataframe_compressed<P: AsRef<Path>>(
&self,
_dataframe: &DataFrame,
_path: P,
_compression: CompressionType,
) -> Result<(), VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
}
impl Default for ParquetWriter {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy)]
pub enum CompressionType {
None,
Snappy,
Gzip,
Lzo,
Brotli,
Lz4,
Zstd,
}
pub struct JsonStreamer {
#[cfg(not(feature = "advanced_io"))]
_phantom: std::marker::PhantomData<()>,
}
impl JsonStreamer {
pub fn new() -> Self {
Self {
#[cfg(not(feature = "advanced_io"))]
_phantom: std::marker::PhantomData,
}
}
#[cfg(feature = "advanced_io")]
pub async fn stream_from_file<P: AsRef<Path>>(
&self,
path: P,
batch_size: usize,
) -> Result<Vec<DataFrame>, VeloxxError> {
let content = tokio::fs::read_to_string(path).await.map_err(|e| {
VeloxxError::InvalidOperation(format!("Failed to read JSON file: {}", e))
})?;
self.stream_from_string(&content, batch_size).await
}
#[cfg(not(feature = "advanced_io"))]
pub async fn stream_from_file<P: AsRef<Path>>(
&self,
_path: P,
_batch_size: usize,
) -> Result<Vec<DataFrame>, VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
#[cfg(feature = "advanced_io")]
pub async fn stream_from_string(
&self,
json_str: &str,
_batch_size: usize,
) -> Result<Vec<DataFrame>, VeloxxError> {
let mut columns = indexmap::IndexMap::new();
columns.insert(
"json_data".to_string(),
Series::new_string("json_data", vec![Some(json_str.to_string())]),
);
let df = DataFrame::new(columns);
Ok(vec![df])
}
#[cfg(not(feature = "advanced_io"))]
pub async fn stream_from_string(
&self,
_json_str: &str,
_batch_size: usize,
) -> Result<Vec<DataFrame>, VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
}
impl Default for JsonStreamer {
fn default() -> Self {
Self::new()
}
}
pub struct DatabaseConnector {
#[cfg(feature = "advanced_io")]
connection_string: String,
#[cfg(not(feature = "advanced_io"))]
_phantom: std::marker::PhantomData<()>,
}
impl DatabaseConnector {
pub fn new(connection_string: &str) -> Self {
Self {
#[cfg(feature = "advanced_io")]
connection_string: connection_string.to_string(),
#[cfg(not(feature = "advanced_io"))]
_phantom: std::marker::PhantomData,
}
}
#[cfg(feature = "advanced_io")]
pub async fn query(&self, query: &str) -> Result<DataFrame, VeloxxError> {
let mut columns = indexmap::IndexMap::new();
columns.insert(
"query_result".to_string(),
Series::new_string(
"query_result",
vec![Some(format!(
"Executed '{}' on {}",
query, self.connection_string
))],
),
);
Ok(DataFrame::new(columns))
}
#[cfg(not(feature = "advanced_io"))]
pub async fn query(&self, _query: &str) -> Result<DataFrame, VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
#[cfg(feature = "advanced_io")]
pub async fn insert_dataframe(
&self,
_dataframe: &DataFrame,
table_name: &str,
) -> Result<(), VeloxxError> {
println!(
"Would insert DataFrame into table '{}' using connection: {}",
table_name, self.connection_string
);
Ok(())
}
#[cfg(not(feature = "advanced_io"))]
pub async fn insert_dataframe(
&self,
_dataframe: &DataFrame,
_table_name: &str,
) -> Result<(), VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
#[cfg(feature = "advanced_io")]
pub async fn create_table_from_dataframe(
&self,
dataframe: &DataFrame,
table_name: &str,
) -> Result<(), VeloxxError> {
let mut create_sql = format!("CREATE TABLE {} (", table_name);
let column_names = dataframe.column_names();
for (i, column_name) in column_names.iter().enumerate() {
if let Some(series) = dataframe.get_column(column_name) {
let sql_type = match series.data_type() {
DataType::I32 => "INTEGER",
DataType::F64 => "REAL",
DataType::Bool => "BOOLEAN",
DataType::String => "TEXT",
DataType::DateTime => "DATETIME",
};
create_sql.push_str(&format!("{} {}", column_name, sql_type));
if i < column_names.len() - 1 {
create_sql.push_str(", ");
}
}
}
create_sql.push(')');
println!(
"Would execute on {}: {}",
self.connection_string, create_sql
);
Ok(())
}
#[cfg(not(feature = "advanced_io"))]
pub async fn create_table_from_dataframe(
&self,
_dataframe: &DataFrame,
_table_name: &str,
) -> Result<(), VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
}
pub struct AsyncFileOps;
impl AsyncFileOps {
#[cfg(feature = "advanced_io")]
pub async fn read_csv_async<P: AsRef<Path>>(path: P) -> Result<DataFrame, VeloxxError> {
let content = tokio::fs::read_to_string(path).await.map_err(|e| {
VeloxxError::InvalidOperation(format!("Failed to read CSV file: {}", e))
})?;
Self::parse_csv_from_string(&content)
}
#[cfg(feature = "advanced_io")]
fn parse_csv_from_string(content: &str) -> Result<DataFrame, VeloxxError> {
let lines: Vec<&str> = content.lines().collect();
if lines.is_empty() {
return Err(VeloxxError::InvalidOperation(
"CSV content is empty".to_string(),
));
}
let header_line = lines[0];
let column_names: Vec<String> = header_line
.split(',')
.map(|s| s.trim().to_string())
.collect();
let mut data_rows: Vec<Vec<String>> = Vec::new();
for line in &lines[1..] {
let row: Vec<String> = line.split(',').map(|s| s.trim().to_string()).collect();
data_rows.push(row);
}
DataFrame::from_vec_of_vec(data_rows, column_names)
}
#[cfg(not(feature = "advanced_io"))]
pub async fn read_csv_async<P: AsRef<Path>>(_path: P) -> Result<DataFrame, VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
#[cfg(feature = "advanced_io")]
pub async fn write_csv_async<P: AsRef<Path>>(
dataframe: &DataFrame,
path: P,
) -> Result<(), VeloxxError> {
let csv_content = Self::dataframe_to_csv_string(dataframe)?;
tokio::fs::write(path, csv_content).await.map_err(|e| {
VeloxxError::InvalidOperation(format!("Failed to write CSV file: {}", e))
})?;
Ok(())
}
#[cfg(feature = "advanced_io")]
fn dataframe_to_csv_string(dataframe: &DataFrame) -> Result<String, VeloxxError> {
let mut csv_content = String::new();
let col_names_owned = dataframe.column_names();
let column_names: Vec<&str> = col_names_owned.iter().map(|s| s.as_str()).collect();
csv_content.push_str(&column_names.join(","));
csv_content.push('\n');
for i in 0..dataframe.row_count() {
let mut row_values: Vec<String> = Vec::new();
for col_name in column_names.iter() {
let series = dataframe.get_column(col_name).unwrap();
let value_str = match series.get_value(i) {
Some(crate::types::Value::I32(v)) => v.to_string(),
Some(crate::types::Value::F64(v)) => v.to_string(),
Some(crate::types::Value::Bool(v)) => v.to_string(),
Some(crate::types::Value::String(v)) => v,
Some(crate::types::Value::DateTime(v)) => v.to_string(),
Some(crate::types::Value::Null) => String::new(),
None => String::new(),
};
row_values.push(value_str);
}
csv_content.push_str(&row_values.join(","));
csv_content.push('\n');
}
Ok(csv_content)
}
#[cfg(not(feature = "advanced_io"))]
pub async fn write_csv_async<P: AsRef<Path>>(
_dataframe: &DataFrame,
_path: P,
) -> Result<(), VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
#[cfg(feature = "advanced_io")]
pub async fn read_json_async<P: AsRef<Path>>(path: P) -> Result<DataFrame, VeloxxError> {
let path_str = path.as_ref().to_string_lossy().to_string();
tokio::task::spawn_blocking(move || {
let parser = crate::io::json::UltraFastJsonParser::new();
parser.read_file(&path_str)
})
.await
.map_err(|e| VeloxxError::InvalidOperation(format!("Task join error: {}", e)))?
}
#[cfg(not(feature = "advanced_io"))]
pub async fn read_json_async<P: AsRef<Path>>(_path: P) -> Result<DataFrame, VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
#[cfg(feature = "advanced_io")]
pub async fn write_json_async<P: AsRef<Path>>(
dataframe: &DataFrame,
path: P,
) -> Result<(), VeloxxError> {
let json_content = Self::dataframe_to_json_string(dataframe)?;
tokio::fs::write(path, json_content).await.map_err(|e| {
VeloxxError::InvalidOperation(format!("Failed to write JSON file: {}", e))
})?;
Ok(())
}
#[cfg(feature = "advanced_io")]
fn dataframe_to_json_string(dataframe: &DataFrame) -> Result<String, VeloxxError> {
let mut json_content = String::from("[\n");
for i in 0..dataframe.row_count() {
if i > 0 {
json_content.push_str(",\n");
}
json_content.push_str(" {");
let mut first_field = true;
for column_name in dataframe.column_names() {
if !first_field {
json_content.push_str(", ");
}
first_field = false;
let series = dataframe.get_column(&column_name).unwrap();
json_content.push_str(&format!("\"{}\":", column_name));
match series.get_value(i) {
Some(crate::types::Value::I32(v)) => json_content.push_str(&v.to_string()),
Some(crate::types::Value::F64(v)) => json_content.push_str(&v.to_string()),
Some(crate::types::Value::Bool(v)) => json_content.push_str(&v.to_string()),
Some(crate::types::Value::String(v)) => {
json_content.push_str(&format!("\"{}\"", v))
}
Some(crate::types::Value::DateTime(v)) => json_content.push_str(&v.to_string()),
Some(crate::types::Value::Null) => json_content.push_str("null"),
None => json_content.push_str("null"),
}
}
json_content.push('}');
}
json_content.push_str("\n]");
Ok(json_content)
}
#[cfg(not(feature = "advanced_io"))]
pub async fn write_json_async<P: AsRef<Path>>(
_dataframe: &DataFrame,
_path: P,
) -> Result<(), VeloxxError> {
Err(VeloxxError::InvalidOperation(
"Advanced I/O feature is not enabled. Enable with --features advanced_io".to_string(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parquet_reader_creation() {
let reader = ParquetReader::new();
assert_eq!(
std::mem::size_of_val(&reader),
std::mem::size_of::<ParquetReader>()
);
}
#[test]
fn test_parquet_writer_creation() {
let writer = ParquetWriter::new();
assert_eq!(
std::mem::size_of_val(&writer),
std::mem::size_of::<ParquetWriter>()
);
}
#[test]
fn test_json_streamer_creation() {
let streamer = JsonStreamer::new();
assert_eq!(
std::mem::size_of_val(&streamer),
std::mem::size_of::<JsonStreamer>()
);
}
#[test]
fn test_database_connector_creation() {
let connector = DatabaseConnector::new("sqlite://test.db");
assert_eq!(
std::mem::size_of_val(&connector),
std::mem::size_of::<DatabaseConnector>()
);
}
#[tokio::test]
async fn test_advanced_io_without_feature() {
let reader = ParquetReader::new();
let result = reader.read_dataframe("test.parquet").await;
#[cfg(not(feature = "advanced_io"))]
assert!(result.is_err());
#[cfg(feature = "advanced_io")]
{
let _ = result;
}
}
}