#[cfg(feature = "distributed")]
mod tests {
use pandrs::distributed::expr::{
ColumnMeta, ColumnProjection, Expr, ExprDataType, ExprSchema, ExprValidator, InferredType,
UdfDefinition,
};
use pandrs::error::Result;
fn create_test_schema() -> ExprSchema {
let mut schema = ExprSchema::new();
schema
.add_column(ColumnMeta::new("id", ExprDataType::Integer, false, None))
.add_column(ColumnMeta::new("name", ExprDataType::String, false, None))
.add_column(ColumnMeta::new("price", ExprDataType::Float, false, None))
.add_column(ColumnMeta::new(
"quantity",
ExprDataType::Integer,
false,
None,
))
.add_column(ColumnMeta::new(
"in_stock",
ExprDataType::Boolean,
false,
None,
))
.add_column(ColumnMeta::new(
"created_at",
ExprDataType::Timestamp,
false,
None,
))
.add_column(ColumnMeta::new(
"nullable_value",
ExprDataType::Float,
true,
None,
));
schema
}
#[test]
#[allow(clippy::result_large_err)]
fn test_column_reference_validation() -> Result<()> {
let schema = create_test_schema();
let validator = ExprValidator::new(&schema);
let expr = Expr::col("id");
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Integer);
assert!(!result.nullable);
let expr = Expr::col("nonexistent");
assert!(validator.validate_expr(&expr).is_err());
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_literal_validation() -> Result<()> {
let schema = create_test_schema();
let validator = ExprValidator::new(&schema);
let expr = Expr::lit(42);
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Integer);
assert!(!result.nullable);
let expr = Expr::lit(42.5);
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Float);
assert!(!result.nullable);
let expr = Expr::lit("test");
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::String);
assert!(!result.nullable);
let expr = Expr::lit(true);
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Boolean);
assert!(!result.nullable);
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_arithmetic_validation() -> Result<()> {
let schema = create_test_schema();
let validator = ExprValidator::new(&schema);
let expr = Expr::col("id").add(Expr::col("quantity"));
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Integer);
let expr = Expr::col("id").add(Expr::col("price"));
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Float);
let expr = Expr::col("price").mul(Expr::col("quantity"));
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Float);
let expr = Expr::col("id").div(Expr::col("quantity"));
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Float);
let expr = Expr::col("name").add(Expr::col("name"));
assert!(validator.validate_expr(&expr).is_err());
let expr = Expr::col("name").mul(Expr::col("id"));
assert!(validator.validate_expr(&expr).is_err());
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_comparison_validation() -> Result<()> {
let schema = create_test_schema();
let validator = ExprValidator::new(&schema);
let expr = Expr::col("id").eq(Expr::col("quantity"));
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Boolean);
let expr = Expr::col("name").eq(Expr::lit("test"));
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Boolean);
let expr = Expr::col("price").gt(Expr::col("id"));
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Boolean);
let expr = Expr::col("name").lt(Expr::col("price"));
assert!(validator.validate_expr(&expr).is_err());
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_logical_validation() -> Result<()> {
let schema = create_test_schema();
let validator = ExprValidator::new(&schema);
let expr = Expr::col("in_stock").and(Expr::col("in_stock"));
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Boolean);
let expr = Expr::col("in_stock").or(Expr::col("in_stock"));
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Boolean);
let expr = Expr::col("id").and(Expr::col("in_stock"));
assert!(validator.validate_expr(&expr).is_err());
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_function_validation() -> Result<()> {
let schema = create_test_schema();
let mut validator = ExprValidator::new(&schema);
validator.add_udf(
"calculate_total",
ExprDataType::Float,
vec![ExprDataType::Float, ExprDataType::Integer],
);
let expr = Expr::call(
"calculate_total",
vec![Expr::col("price"), Expr::col("quantity")],
);
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Float);
let expr = Expr::call("unknown_function", vec![Expr::col("id")]);
assert!(validator.validate_expr(&expr).is_err());
let expr = Expr::call(
"calculate_total",
vec![Expr::col("name"), Expr::col("quantity")],
);
assert!(validator.validate_expr(&expr).is_err());
let expr = Expr::call("calculate_total", vec![Expr::col("price")]);
assert!(validator.validate_expr(&expr).is_err());
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_case_validation() -> Result<()> {
let schema = create_test_schema();
let validator = ExprValidator::new(&schema);
let expr = Expr::case(
vec![
(Expr::col("price").lt(Expr::lit(50.0)), Expr::lit("Low")),
(Expr::col("price").lt(Expr::lit(100.0)), Expr::lit("Medium")),
],
Some(Expr::lit("High")),
);
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::String);
let expr = Expr::case(
vec![
(Expr::col("price").lt(Expr::lit(50.0)), Expr::lit("Low")),
(Expr::col("price").lt(Expr::lit(100.0)), Expr::lit(42)),
],
None,
);
assert!(validator.validate_expr(&expr).is_err());
let expr = Expr::case(
vec![
(Expr::col("price"), Expr::lit("Low")),
(Expr::col("quantity"), Expr::lit("High")),
],
None,
);
assert!(validator.validate_expr(&expr).is_err());
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_cast_validation() -> Result<()> {
let schema = create_test_schema();
let validator = ExprValidator::new(&schema);
let expr = Expr::col("id").to_float();
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Float);
let expr = Expr::col("price").to_integer();
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Integer);
let expr = Expr::col("id").to_string();
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::String);
let expr = Expr::col("in_stock").to_string();
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::String);
let expr = Expr::cast(Expr::col("in_stock"), ExprDataType::Date);
assert!(validator.validate_expr(&expr).is_err());
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_coalesce_validation() -> Result<()> {
let schema = create_test_schema();
let validator = ExprValidator::new(&schema);
let expr = Expr::coalesce(vec![Expr::col("nullable_value"), Expr::lit(0.0)]);
let result = validator.validate_expr(&expr)?;
assert_eq!(result.data_type, ExprDataType::Float);
assert!(!result.nullable);
let expr = Expr::coalesce(vec![Expr::col("nullable_value"), Expr::col("id")]);
assert!(validator.validate_expr(&expr).is_err());
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_projection_validation() -> Result<()> {
let schema = create_test_schema();
let validator = ExprValidator::new(&schema);
let projections = vec![
ColumnProjection::column("id"),
ColumnProjection::with_alias(
Expr::col("price").mul(Expr::col("quantity")),
"total_value",
),
ColumnProjection::with_alias(Expr::col("price").gt(Expr::lit(100.0)), "is_expensive"),
];
let result = validator.validate_projections(&projections)?;
assert_eq!(result.len(), 3);
assert_eq!(result.get("id").unwrap().data_type, ExprDataType::Integer);
assert_eq!(
result.get("total_value").unwrap().data_type,
ExprDataType::Float
);
assert_eq!(
result.get("is_expensive").unwrap().data_type,
ExprDataType::Boolean
);
let projections = [ColumnProjection::column("nonexistent")];
assert!(validator.validate_projections(&projections).is_err());
Ok(())
}
#[test]
#[allow(clippy::result_large_err)]
fn test_arrow_schema_conversion() -> Result<()> {
let schema = create_test_schema();
let arrow_schema = schema.to_arrow_schema()?;
assert_eq!(arrow_schema.fields().len(), schema.len());
let id_field = arrow_schema.field_with_name("id").unwrap();
assert_eq!(id_field.data_type(), &arrow::datatypes::DataType::Int64);
let name_field = arrow_schema.field_with_name("name").unwrap();
assert_eq!(name_field.data_type(), &arrow::datatypes::DataType::Utf8);
let price_field = arrow_schema.field_with_name("price").unwrap();
assert_eq!(
price_field.data_type(),
&arrow::datatypes::DataType::Float64
);
let round_trip_schema = ExprSchema::from_arrow_schema(&arrow_schema)?;
assert_eq!(round_trip_schema.len(), schema.len());
let id_col = round_trip_schema.column("id").unwrap();
assert_eq!(id_col.data_type, ExprDataType::Integer);
let name_col = round_trip_schema.column("name").unwrap();
assert_eq!(name_col.data_type, ExprDataType::String);
let price_col = round_trip_schema.column("price").unwrap();
assert_eq!(price_col.data_type, ExprDataType::Float);
Ok(())
}
}