#[cfg(feature = "distributed")]
use pandrs::distributed::expr::{ColumnProjection, Expr, ExprDataType, UdfDefinition};
#[cfg(feature = "distributed")]
use pandrs::distributed::DistributedContext;
#[cfg(feature = "distributed")]
use pandrs::error::Result;
#[cfg(feature = "distributed")]
#[allow(clippy::result_large_err)]
fn main() -> Result<()> {
let mut context = DistributedContext::new_local(4)?;
context.register_csv("sales", "examples/data/sales.csv")?;
let sales_df = context.dataset("sales")?;
println!("Original DataFrame:");
let result = sales_df.collect()?;
println!("{}", result);
println!("\n--- Example 1: Column Selection ---");
let selected = sales_df.select_expr(&[
ColumnProjection::column("region"),
ColumnProjection::column("sales"),
ColumnProjection::with_alias(Expr::col("sales").mul(Expr::lit(1.1)), "sales_with_bonus"),
])?;
println!("Selected columns with expression:");
let result = selected.collect()?;
println!("{}", result);
println!("\n--- Example 2: Complex Calculations ---");
let with_calcs = sales_df.with_column(
"profit_margin",
Expr::col("profit")
.div(Expr::col("sales"))
.mul(Expr::lit(100.0)),
)?;
println!("With calculated columns:");
let result = with_calcs.collect()?;
println!("{}", result);
println!("\n--- Example 3: Expression-based Filtering ---");
let high_margin = sales_df.filter_expr(
Expr::col("profit")
.div(Expr::col("sales"))
.mul(Expr::lit(100.0))
.gt(Expr::lit(15.0)),
)?;
println!("High margin sales (> 15%):");
let result = high_margin.collect()?;
println!("{}", result);
println!("\n--- Example 4: User Defined Functions ---");
let commission_udf = UdfDefinition::new(
"calculate_commission",
ExprDataType::Float,
vec![ExprDataType::Float, ExprDataType::Float],
"CASE
WHEN param1 / param0 > 0.2 THEN param0 * 0.05
WHEN param1 / param0 > 0.1 THEN param0 * 0.03
ELSE param0 * 0.01
END",
);
let with_udf = sales_df.create_udf(&[commission_udf])?;
let with_commission = with_udf.select_expr(&[
ColumnProjection::column("region"),
ColumnProjection::column("sales"),
ColumnProjection::column("profit"),
ColumnProjection::with_alias(
Expr::call(
"calculate_commission",
vec![Expr::col("sales"), Expr::col("profit")],
),
"commission",
),
])?;
println!("Sales with calculated commission:");
let result = with_commission.collect()?;
println!("{}", result);
println!("\n--- Example 5: Combining Operations ---");
let final_analysis = sales_df
.with_column("profit_pct",
Expr::col("profit").div(Expr::col("sales")).mul(Expr::lit(100.0))
)?
.filter_expr(
Expr::col("profit_pct").gt(Expr::lit(12.0))
)?
.select_expr(&[
ColumnProjection::column("region"),
ColumnProjection::column("product"),
ColumnProjection::column("sales"),
ColumnProjection::column("profit"),
ColumnProjection::column("profit_pct"),
ColumnProjection::with_alias(
Expr::col("profit")
.mul(Expr::lit(0.1))
.add(Expr::col("profit_pct").mul(Expr::lit(5.0))),
"bonus"
),
])?;
println!("Final analysis with combined operations:");
let result = final_analysis.collect()?;
println!("{}", result);
Ok(())
}
#[cfg(not(feature = "distributed"))]
fn main() {
println!("This example requires the 'distributed' feature flag to be enabled.");
println!("Please recompile with:");
println!(" cargo run --example distributed_expr_example --features distributed");
}