use crate::Value;
use crate::error::{Error, Result};
use polars::prelude::*;
pub struct LazyPipeline {
operations: Vec<Box<dyn Fn(LazyFrame) -> Result<LazyFrame> + Send + Sync>>,
}
impl LazyPipeline {
pub fn new() -> Self {
Self {
operations: Vec::new(),
}
}
pub fn select(mut self, columns: Vec<String>) -> Self {
self.operations.push(Box::new(move |lf: LazyFrame| {
let cols: Vec<Expr> = columns.iter().map(|c| col(c)).collect();
Ok(lf.select(&cols))
}));
self
}
pub fn filter(mut self, predicate: Expr) -> Self {
self.operations.push(Box::new(move |lf: LazyFrame| {
Ok(lf.filter(predicate.clone()))
}));
self
}
pub fn sort(mut self, column: String, descending: bool) -> Self {
self.operations.push(Box::new(move |lf: LazyFrame| {
let options = SortOptions {
descending,
nulls_last: false,
multithreaded: true,
maintain_order: false,
};
Ok(lf.sort(&column, options))
}));
self
}
pub fn head(mut self, n: u32) -> Self {
self.operations
.push(Box::new(move |lf: LazyFrame| Ok(lf.limit(n))));
self
}
pub fn tail(mut self, n: u32) -> Self {
self.operations
.push(Box::new(move |lf: LazyFrame| Ok(lf.tail(n))));
self
}
pub fn group_by(mut self, columns: Vec<String>) -> Self {
self.operations.push(Box::new(move |lf: LazyFrame| {
let group_cols: Vec<Expr> = columns.iter().map(|c| col(c)).collect();
Ok(lf.group_by(&group_cols).agg(&[]))
}));
self
}
pub fn with_column(mut self, name: String, expr: Expr) -> Self {
self.operations.push(Box::new(move |lf: LazyFrame| {
Ok(lf.with_column(expr.clone().alias(&name)))
}));
self
}
pub fn drop_columns(mut self, columns: Vec<String>) -> Self {
self.operations.push(Box::new(move |lf: LazyFrame| {
let cols_to_keep: Vec<Expr> = lf
.schema()
.map_err(|e| Error::operation(format!("Schema error: {}", e)))?
.iter_names()
.filter(|name| !columns.contains(&name.to_string()))
.map(|name| col(name))
.collect();
Ok(lf.select(&cols_to_keep))
}));
self
}
pub fn rename(mut self, old_name: String, new_name: String) -> Self {
self.operations.push(Box::new(move |lf: LazyFrame| {
Ok(lf.rename([&old_name], [&new_name]))
}));
self
}
pub fn build(&self, input: LazyFrame) -> Result<LazyFrame> {
let mut lf = input;
for operation in &self.operations {
lf = operation(lf)?;
}
Ok(lf)
}
pub fn execute(&self, input: LazyFrame) -> Result<DataFrame> {
let lf = self.build(input)?;
lf.collect()
.map_err(|e| Error::operation(format!("Polars error: {e}")))
}
pub fn execute_as_value(&self, input: Value) -> Result<Value> {
match input {
Value::LazyFrame(lf) => {
let result_lf = self.build(*lf)?;
Ok(Value::lazy_frame(result_lf))
}
Value::DataFrame(df) => {
let lf = df.lazy();
let result_lf = self.build(lf)?;
Ok(Value::lazy_frame(result_lf))
}
_ => Err(Error::operation(
"LazyPipeline requires DataFrame or LazyFrame input",
)),
}
}
pub fn execute_and_collect(&self, input: Value) -> Result<Value> {
match input {
Value::LazyFrame(lf) => {
let df = self.execute(*lf)?;
Ok(Value::dataframe(df))
}
Value::DataFrame(df) => {
let lf = df.lazy();
let result_df = self.execute(lf)?;
Ok(Value::dataframe(result_df))
}
_ => Err(Error::operation(
"LazyPipeline requires DataFrame or LazyFrame input",
)),
}
}
pub fn len(&self) -> usize {
self.operations.len()
}
pub fn is_empty(&self) -> bool {
self.operations.is_empty()
}
}
impl Default for LazyPipeline {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use polars::prelude::*;
#[test]
fn test_lazy_pipeline_basic() {
let df = DataFrame::new(vec![
Series::new("id".into(), vec![1i64, 2, 3, 4, 5]),
Series::new("value".into(), vec![10i64, 20, 30, 40, 50]),
])
.unwrap();
let pipeline = LazyPipeline::new()
.filter(col("value").gt(lit(20)))
.sort("value".to_string(), false)
.head(2);
let result = pipeline.execute(df.lazy()).unwrap();
assert_eq!(result.height(), 2);
assert_eq!(result.width(), 2);
}
#[test]
fn test_lazy_pipeline_select() {
let df = DataFrame::new(vec![
Series::new("id".into(), vec![1i64, 2, 3]),
Series::new("value".into(), vec![10i64, 20, 30]),
Series::new("extra".into(), vec!["a", "b", "c"]),
])
.unwrap();
let pipeline = LazyPipeline::new().select(vec!["id".to_string(), "value".to_string()]);
let result = pipeline.execute(df.lazy()).unwrap();
assert_eq!(result.width(), 2);
assert!(result.get_column_names().contains(&"id".into()));
assert!(result.get_column_names().contains(&"value".into()));
assert!(!result.get_column_names().contains(&"extra".into()));
}
#[test]
fn test_lazy_pipeline_with_value() {
let df = DataFrame::new(vec![
Series::new("id".into(), vec![1i64, 2, 3]),
Series::new("value".into(), vec![10i64, 20, 30]),
])
.unwrap();
let input = Value::dataframe(df);
let pipeline = LazyPipeline::new().filter(col("value").gt(lit(10))).head(2);
let result = pipeline.execute_and_collect(input).unwrap();
match result {
Value::DataFrame(df) => {
assert_eq!(df.height(), 2);
}
_ => panic!("Expected DataFrame"),
}
}
}