#[cfg(feature = "distributed")]
use pandrs::distributed::{window_functions, WindowFunctionExt};
#[cfg(feature = "distributed")]
use pandrs::distributed::{WindowFrame, WindowFrameBoundary};
#[cfg(feature = "distributed")]
use pandrs::error::Result;
#[cfg(feature = "distributed")]
use pandrs::{
distributed::{DistributedConfig, ToDistributed},
DataFrame,
};
#[cfg(feature = "distributed")]
fn main() -> Result<()> {
println!("PandRS Distributed Window Functions Example");
let mut df = create_test_data()?;
println!("Original DataFrame:\n{:?}\n", df);
let config = DistributedConfig::default()
.with_executor_count(2)
.with_partition_size(5);
let ctx = pandrs::distributed::datafusion::DataFusionContext::new(config);
let dist_df = df.to_distributed(&ctx)?;
println!("\n-- Row Number Example --");
let window_df = dist_df.window_function(
window_functions::row_number(),
"row_num",
None, None, )?;
let result_df = window_df.collect()?;
println!("Row Number Result:\n{:?}\n", result_df);
println!("\n-- Rank with Partition By Example --");
let window_df = dist_df.window_function(
window_functions::rank(),
"rank",
Some(vec!["category"]), Some(vec![("value", true)]), )?;
let result_df = window_df.collect()?;
println!("Rank Result:\n{:?}\n", result_df);
println!("\n-- Custom Window Frame Example --");
let window_frame = WindowFrame {
start_bound: WindowFrameBoundary::Preceding(2),
end_bound: WindowFrameBoundary::CurrentRow,
};
let window_df = dist_df.window_function_with_frame(
window_functions::sum("value"), "sum_3_rows",
Some(vec!["category"]), Some(vec![("id", true)]), window_frame,
)?;
let result_df = window_df.collect()?;
println!("Window Frame Result:\n{:?}\n", result_df);
println!("\n-- Multiple Window Functions Example --");
let window_df1 = dist_df.window_function(
window_functions::rank(),
"rank_in_category",
Some(vec!["category"]), Some(vec![("value", false)]), )?;
let window_df2 = window_df1.window_function(
window_functions::avg("value"),
"avg_value_in_category",
Some(vec!["category"]), None, )?;
let window_df3 = window_df2.window_function(
window_functions::row_number(),
"overall_row_num",
None, Some(vec![("value", false)]), )?;
let result_df = window_df3.collect()?;
println!("Multiple Window Functions Result:\n{:?}\n", result_df);
println!("\n-- Running Totals Example --");
let running_frame = WindowFrame {
start_bound: WindowFrameBoundary::UnboundedPreceding,
end_bound: WindowFrameBoundary::CurrentRow,
};
let window_df = dist_df.window_function_with_frame(
window_functions::sum("value"), "running_total",
None, Some(vec![("id", true)]), running_frame,
)?;
let result_df = window_df.collect()?;
println!("Running Totals Result:\n{:?}\n", result_df);
Ok(())
}
#[cfg(feature = "distributed")]
fn create_test_data() -> Result<DataFrame> {
use pandrs::column::{Column, Float64Column, Int64Column, StringColumn};
use pandrs::optimized::OptimizedDataFrame;
let mut df = OptimizedDataFrame::new();
let ids = Int64Column::new(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);
df.add_column("id".to_string(), Column::Int64(ids))?;
let values = Float64Column::new(vec![
55.0, 30.0, 40.0, 85.0, 60.0, 75.0, 45.0, 90.0, 25.0, 50.0,
]);
df.add_column("value".to_string(), Column::Float64(values))?;
let categories = StringColumn::new(vec![
"A".to_string(),
"A".to_string(),
"B".to_string(),
"B".to_string(),
"C".to_string(),
"C".to_string(),
"A".to_string(),
"B".to_string(),
"C".to_string(),
"A".to_string(),
]);
df.add_column("category".to_string(), Column::String(categories))?;
Ok(df)
}
#[cfg(not(feature = "distributed"))]
fn main() {
println!("This example requires the 'distributed' feature flag to be enabled.");
println!("Please recompile with 'cargo run --example distributed_window_example --features distributed'");
}