use std::collections::HashMap;
use pandrs::dataframe::DataFrame;
use pandrs::error::Result;
use pandrs::{BooleanColumn, Column, Float64Column, Int64Column, OptimizedDataFrame, StringColumn};
#[cfg(feature = "distributed")]
use pandrs::distributed::{DistributedConfig, DistributedContext};
#[cfg(feature = "parquet")]
use pandrs::io::parquet::{read_parquet, write_parquet, ParquetCompression};
#[test]
#[allow(clippy::result_large_err)]
fn test_dataframe_operations() -> Result<()> {
let mut df = DataFrame::new();
df.add_column(
"name".to_string(),
pandrs::series::Series::new(
vec!["Alice".to_string(), "Bob".to_string(), "Carol".to_string()],
Some("name".to_string()),
)?,
)?;
df.add_column(
"age".to_string(),
pandrs::series::Series::new(vec![25, 30, 35], Some("age".to_string()))?,
)?;
df.add_column(
"salary".to_string(),
pandrs::series::Series::new(vec![50000, 60000, 70000], Some("salary".to_string()))?,
)?;
assert_eq!(df.column_names(), ["name", "age", "salary"]);
assert_eq!(df.row_count(), 3);
let mut rename_map = HashMap::new();
rename_map.insert("name".to_string(), "employee_name".to_string());
rename_map.insert("age".to_string(), "employee_age".to_string());
df.rename_columns(&rename_map)?;
assert_eq!(
df.column_names(),
["employee_name", "employee_age", "salary"]
);
assert!(df.contains_column("employee_name"));
assert!(df.contains_column("employee_age"));
assert!(!df.contains_column("name")); assert!(!df.contains_column("age"));
let new_names = vec![
"worker_name".to_string(),
"worker_age".to_string(),
"worker_salary".to_string(),
];
df.set_column_names(new_names)?;
assert_eq!(
df.column_names(),
["worker_name", "worker_age", "worker_salary"]
);
let name_data = df.get_column_string_values("worker_name")?;
assert_eq!(name_data, ["Alice", "Bob", "Carol"]);
let age_data = df.get_column_string_values("worker_age")?;
assert_eq!(age_data, ["25", "30", "35"]);
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_optimized_dataframe_operations() -> Result<()> {
let mut df = OptimizedDataFrame::new();
let names = StringColumn::new(vec![
"Alice".to_string(),
"Bob".to_string(),
"Carol".to_string(),
]);
let ages = Int64Column::new(vec![25, 30, 35]);
let salaries = Float64Column::new(vec![50000.0, 60000.0, 70000.0]);
let active = BooleanColumn::new(vec![true, false, true]);
df.add_column("name", Column::String(names))?;
df.add_column("age", Column::Int64(ages))?;
df.add_column("salary", Column::Float64(salaries))?;
df.add_column("active", Column::Boolean(active))?;
assert_eq!(df.column_count(), 4);
assert_eq!(df.row_count(), 3);
let mut rename_map = HashMap::new();
rename_map.insert("name".to_string(), "employee_name".to_string());
rename_map.insert("active".to_string(), "is_active".to_string());
df.rename_columns(&rename_map)?;
assert!(df.contains_column("employee_name"));
assert!(df.contains_column("is_active"));
assert!(!df.contains_column("name"));
assert!(!df.contains_column("active"));
let new_names = vec![
"emp_name".to_string(),
"emp_age".to_string(),
"emp_salary".to_string(),
"emp_active".to_string(),
];
df.set_column_names(new_names)?;
assert_eq!(
df.column_names(),
["emp_name", "emp_age", "emp_salary", "emp_active"]
);
let name_col = df.column("emp_name")?;
assert!(name_col.as_string().is_some());
let age_col = df.column("emp_age")?;
assert!(age_col.as_int64().is_some());
let salary_col = df.column("emp_salary")?;
assert!(salary_col.as_float64().is_some());
let active_col = df.column("emp_active")?;
assert!(active_col.as_boolean().is_some());
Ok(())
}
#[cfg(feature = "parquet")]
#[test]
#[allow(clippy::result_large_err)]
fn test_enhanced_parquet_io() -> Result<()> {
use std::fs::remove_file;
use std::path::Path;
let test_file = "test_parquet.parquet";
if Path::new(test_file).exists() {
let _ = remove_file(test_file);
}
let mut df = OptimizedDataFrame::new();
let names = StringColumn::new(vec![
"Product_A".to_string(),
"Product_B".to_string(),
"Product_C".to_string(),
"Product_D".to_string(),
]);
let quantities = Int64Column::new(vec![100, 250, 75, 300]);
let prices = Float64Column::new(vec![19.99, 49.99, 9.99, 99.99]);
let in_stock = BooleanColumn::new(vec![true, false, true, true]);
df.add_column("product_name", Column::String(names))?;
df.add_column("quantity", Column::Int64(quantities))?;
df.add_column("price", Column::Float64(prices))?;
df.add_column("in_stock", Column::Boolean(in_stock))?;
write_parquet(&df, test_file, Some(ParquetCompression::Snappy))?;
assert!(Path::new(test_file).exists());
let loaded_df = read_parquet(test_file)?;
assert_eq!(loaded_df.row_count(), 4);
assert_eq!(loaded_df.column_names().len(), 4);
assert!(loaded_df.contains_column("product_name"));
assert!(loaded_df.contains_column("quantity"));
assert!(loaded_df.contains_column("price"));
assert!(loaded_df.contains_column("in_stock"));
let product_data = loaded_df.get_column_string_values("product_name")?;
assert!(product_data.contains(&"Product_A".to_string()));
assert!(product_data.contains(&"Product_B".to_string()));
let _ = remove_file(test_file);
let gzip_file = "test_gzip.parquet";
write_parquet(&df, gzip_file, Some(ParquetCompression::Gzip))?;
assert!(Path::new(gzip_file).exists());
let gzip_loaded_df = read_parquet(gzip_file)?;
assert_eq!(gzip_loaded_df.row_count(), 4);
let _ = remove_file(gzip_file);
Ok(())
}
#[cfg(feature = "distributed")]
#[test]
#[allow(clippy::result_large_err)]
fn test_distributed_processing_integration() -> Result<()> {
let mut df = DataFrame::new();
df.add_column(
"region".to_string(),
pandrs::series::Series::new(
vec![
"North".to_string(),
"South".to_string(),
"East".to_string(),
"West".to_string(),
"North".to_string(),
"South".to_string(),
],
Some("region".to_string()),
)?,
)?;
df.add_column(
"sales".to_string(),
pandrs::series::Series::new(
vec![1000, 1500, 800, 1200, 900, 1100],
Some("sales".to_string()),
)?,
)?;
df.add_column(
"quarter".to_string(),
pandrs::series::Series::new(vec![1, 1, 1, 1, 2, 2], Some("quarter".to_string()))?,
)?;
let config = DistributedConfig::new()
.with_executor("datafusion")
.with_concurrency(2);
let mut context = DistributedContext::new(config)?;
context.register_dataframe("sales_data", &df)?;
assert!(context.get_dataset("sales_data").is_some());
assert_eq!(df.row_count(), 6);
assert_eq!(df.column_names().len(), 3);
Ok(())
}
#[cfg(all(feature = "distributed", feature = "parquet"))]
#[test]
#[allow(clippy::result_large_err)]
fn test_cross_feature_integration() -> Result<()> {
use std::fs::remove_file;
use std::path::Path;
let test_file = std::env::temp_dir().join("test_cross_feature.parquet");
let test_file = test_file.to_str().expect("temp dir path");
if Path::new(test_file).exists() {
let _ = remove_file(test_file);
}
let mut df = OptimizedDataFrame::new();
let departments = StringColumn::new(vec![
"Engineering".to_string(),
"Sales".to_string(),
"Marketing".to_string(),
"Engineering".to_string(),
"Sales".to_string(),
]);
let employees = StringColumn::new(vec![
"Alice".to_string(),
"Bob".to_string(),
"Carol".to_string(),
"David".to_string(),
"Eve".to_string(),
]);
let salaries = Int64Column::new(vec![75000, 65000, 60000, 80000, 70000]);
df.add_column("dept", Column::String(departments))?;
df.add_column("emp", Column::String(employees))?;
df.add_column("sal", Column::Int64(salaries))?;
let mut rename_map = HashMap::new();
rename_map.insert("dept".to_string(), "department".to_string());
rename_map.insert("emp".to_string(), "employee".to_string());
rename_map.insert("sal".to_string(), "salary".to_string());
df.rename_columns(&rename_map)?;
write_parquet(&df, test_file, Some(ParquetCompression::Snappy))?;
let loaded_df = read_parquet(test_file)?;
let config = DistributedConfig::new()
.with_executor("datafusion")
.with_concurrency(2);
let mut context = DistributedContext::new(config)?;
context.register_dataframe("employee_data", &loaded_df)?;
assert!(context.get_dataset("employee_data").is_some());
assert!(loaded_df.row_count() > 0);
assert!(loaded_df.column_names().len() >= 3);
let _ = remove_file(test_file);
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_error_handling() -> Result<()> {
let mut df = DataFrame::new();
df.add_column(
"col1".to_string(),
pandrs::series::Series::new(vec![1, 2, 3], Some("col1".to_string()))?,
)?;
let mut bad_rename_map = HashMap::new();
bad_rename_map.insert("nonexistent".to_string(), "new_name".to_string());
let result = df.rename_columns(&bad_rename_map);
assert!(result.is_err());
let bad_names = vec!["name1".to_string(), "name2".to_string()]; let result = df.set_column_names(bad_names);
assert!(result.is_err());
let empty_map = HashMap::new();
let result = df.rename_columns(&empty_map);
assert!(result.is_ok());
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_performance_characteristics() -> Result<()> {
let size = 1000;
let mut df = OptimizedDataFrame::new();
let mut ids = Vec::with_capacity(size);
let mut values = Vec::with_capacity(size);
let mut categories = Vec::with_capacity(size);
let mut flags = Vec::with_capacity(size);
for i in 0..size {
ids.push(i as i64);
values.push((i as f64) * 1.5);
categories.push(format!("Category_{}", i % 10));
flags.push(i % 2 == 0);
}
let id_col = Int64Column::new(ids);
let value_col = Float64Column::new(values);
let cat_col = StringColumn::new(categories);
let flag_col = BooleanColumn::new(flags);
df.add_column("id", Column::Int64(id_col))?;
df.add_column("value", Column::Float64(value_col))?;
df.add_column("category", Column::String(cat_col))?;
df.add_column("flag", Column::Boolean(flag_col))?;
let start = std::time::Instant::now();
let mut rename_map = HashMap::new();
rename_map.insert("id".to_string(), "identifier".to_string());
rename_map.insert("value".to_string(), "metric".to_string());
rename_map.insert("category".to_string(), "group".to_string());
rename_map.insert("flag".to_string(), "active".to_string());
df.rename_columns(&rename_map)?;
let rename_duration = start.elapsed();
assert!(rename_duration.as_millis() < 100);
let start = std::time::Instant::now();
let new_names = vec![
"id_new".to_string(),
"value_new".to_string(),
"category_new".to_string(),
"flag_new".to_string(),
];
df.set_column_names(new_names)?;
let set_names_duration = start.elapsed();
assert!(set_names_duration.as_millis() < 100);
assert_eq!(df.row_count(), size);
assert_eq!(df.column_count(), 4);
Ok(())
}