Skip to main content

robin_sparkless_polars/plan/
mod.rs

1//! Plan interpreter: execute a serialized logical plan (list of ops) using the existing DataFrame API.
2//!
3//! See [LOGICAL_PLAN_FORMAT.md](../../../docs/LOGICAL_PLAN_FORMAT.md) for the plan and expression schema.
4
5mod expr;
6mod logical;
7
8use crate::dataframe::{DataFrame, JoinType, disambiguate_agg_output_names};
9use crate::functions::{
10    SortOrder, asc_nulls_first, asc_nulls_last, col, desc_nulls_first, desc_nulls_last,
11};
12use crate::plan::expr::{expr_from_value, try_column_from_udf_value};
13use crate::session::{SparkSession, set_thread_udf_session};
14pub use expr::PlanExprError;
15pub use logical::LogicalPlan;
16use polars::prelude::PolarsError;
17use robin_sparkless_core::engine::{DataFrameBackend, PlanExecutor as CorePlanExecutor};
18use robin_sparkless_core::error::EngineError;
19use serde_json::Value;
20
21/// Execute a logical plan: build initial DataFrame from (data, schema), then apply each op in sequence.
22///
23/// - `data`: rows as `Vec<Vec<Value>>` (each inner vec is one row; order matches schema).
24/// - `schema`: list of (column_name, dtype_string) e.g. `[("id", "bigint"), ("name", "string")]`.
25/// - `plan`: list of `{"op": "...", "payload": ...}` objects.
26///
27/// Returns the final DataFrame after applying all operations.
28pub fn execute_plan(
29    session: &SparkSession,
30    data: Vec<Vec<Value>>,
31    schema: Vec<(String, String)>,
32    plan: &[Value],
33) -> Result<DataFrame, PlanError> {
34    set_thread_udf_session(session.clone());
35    let mut df = session
36        .create_dataframe_from_rows(data, schema, false, false)
37        .map_err(PlanError::Session)?
38        .with_case_insensitive_column_resolution();
39
40    for op_value in plan {
41        let op_obj = op_value
42            .as_object()
43            .ok_or_else(|| PlanError::InvalidPlan("each plan step must be a JSON object".into()))?;
44        let op_name = op_obj
45            .get("op")
46            .and_then(Value::as_str)
47            .ok_or_else(|| PlanError::InvalidPlan("each plan step must have 'op' string".into()))?;
48        let mut payload = op_obj.get("payload").cloned().unwrap_or(Value::Null);
49        // Sparkless may put other_data/other_schema at op level (sibling to payload) or use camelCase.
50        // Merge into payload so apply_op finds them (issue #510).
51        if matches!(op_name, "join" | "union" | "unionByName" | "crossJoin") {
52            payload = merge_other_into_payload(payload, op_obj);
53        }
54
55        df = apply_op(session, df, op_name, payload)?;
56    }
57
58    Ok(df)
59}
60
61/// Polars-backed implementation of the core [`PlanExecutor`] trait.
62///
63/// This adapter allows high-level code (e.g. the root crate) to execute JSON plans using
64/// the engine-agnostic [`DataFrameBackend`] and [`EngineError`] types, while reusing the
65/// existing Polars plan interpreter.
66pub struct PolarsPlanExecutor;
67
68impl CorePlanExecutor<SparkSession> for PolarsPlanExecutor {
69    fn execute_plan(
70        session: &SparkSession,
71        data: Vec<Vec<Value>>,
72        schema: Vec<(String, String)>,
73        plan: &[Value],
74    ) -> Result<Box<dyn DataFrameBackend>, EngineError> {
75        // Delegate to the existing JSON plan interpreter and then box the backend DataFrame.
76        let df = execute_plan(session, data, schema, plan)
77            .map_err(|e| EngineError::Internal(e.to_string()))?;
78        Ok(Box::new(df))
79    }
80}
81
82/// Errors from plan execution.
83#[derive(Debug)]
84pub enum PlanError {
85    Session(PolarsError),
86    Expr(PlanExprError),
87    InvalidPlan(String),
88    UnsupportedOp(String),
89}
90
91impl std::fmt::Display for PlanError {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        match self {
94            PlanError::Session(e) => write!(f, "session/df: {e}"),
95            PlanError::Expr(e) => write!(f, "expression: {e}"),
96            PlanError::InvalidPlan(s) => write!(f, "invalid plan: {s}"),
97            PlanError::UnsupportedOp(s) => write!(f, "unsupported op: {s}"),
98        }
99    }
100}
101
102impl std::error::Error for PlanError {}
103
104/// Merge other_data/other_schema from op into payload if missing. Supports snake_case and camelCase.
105fn merge_other_into_payload(payload: Value, op: &serde_json::Map<String, Value>) -> Value {
106    fn get(obj: &serde_json::Map<String, Value>, snake: &str, camel: &str) -> Option<Value> {
107        obj.get(snake).or_else(|| obj.get(camel)).cloned()
108    }
109    let mut p = match payload {
110        Value::Object(m) => m,
111        _ => return payload,
112    };
113    if p.get("other_data").or_else(|| p.get("otherData")).is_none() {
114        if let Some(v) = get(op, "other_data", "otherData") {
115            p.insert("other_data".into(), v);
116        }
117    }
118    if p.get("other_schema")
119        .or_else(|| p.get("otherSchema"))
120        .is_none()
121    {
122        if let Some(v) = get(op, "other_schema", "otherSchema") {
123            p.insert("other_schema".into(), v);
124        }
125    }
126    if p.get("on").is_none() {
127        if let Some(v) = get(op, "on", "on") {
128            p.insert("on".into(), v);
129        }
130    }
131    Value::Object(p)
132}
133
134/// Get other_data from payload (snake_case or camelCase).
135fn get_other_data(payload: &Value) -> Option<&Vec<Value>> {
136    payload
137        .get("other_data")
138        .or_else(|| payload.get("otherData"))
139        .and_then(Value::as_array)
140}
141
142/// Get other_schema from payload (snake_case or camelCase).
143fn get_other_schema(payload: &Value) -> Option<&Vec<Value>> {
144    payload
145        .get("other_schema")
146        .or_else(|| payload.get("otherSchema"))
147        .and_then(Value::as_array)
148}
149
150/// Parse one orderBy column element into zero or more (column_name, ascending) pairs.
151/// Accepts: "col", "col DESC", "col ASC", "['a','b']" (Python repr), {"col":"x"}, {"name":"x"}.
152fn parse_order_by_element(v: &Value) -> Option<Vec<(String, bool)>> {
153    if let Some(s) = v.as_str() {
154        let s = s.trim();
155        if s.eq_ignore_ascii_case("desc") || s.eq_ignore_ascii_case("asc") {
156            return None;
157        }
158        if s.to_uppercase().ends_with(" DESC") {
159            let name = s[..s.len().saturating_sub(5)].trim().to_string();
160            return if name.is_empty() {
161                None
162            } else {
163                Some(vec![(name, false)])
164            };
165        }
166        if s.to_uppercase().ends_with(" ASC") {
167            let name = s[..s.len().saturating_sub(4)].trim().to_string();
168            return if name.is_empty() {
169                None
170            } else {
171                Some(vec![(name, true)])
172            };
173        }
174        if s.starts_with('[') && s.ends_with(']') {
175            let inner = s[1..s.len() - 1].trim();
176            if inner.is_empty() {
177                return Some(vec![]);
178            }
179            let names: Vec<(String, bool)> = inner
180                .split(',')
181                .map(|p| {
182                    (
183                        p.trim().trim_matches('\'').trim_matches('"').to_string(),
184                        true,
185                    )
186                })
187                .filter(|(n, _)| !n.is_empty())
188                .collect();
189            return Some(names);
190        }
191        return Some(vec![(s.to_string(), true)]);
192    }
193    let obj = v.as_object()?;
194    let name = obj
195        .get("col")
196        .or_else(|| obj.get("name"))
197        .and_then(Value::as_str)
198        .map(|s| s.to_string())?;
199    Some(vec![(name, true)])
200}
201
202/// Extract column name from expression if it is a simple column reference {"col": "name"}.
203fn expr_to_col_name(v: &Value) -> Option<String> {
204    let obj = v.as_object()?;
205    obj.get("col")
206        .or_else(|| obj.get("column"))
207        .and_then(Value::as_str)
208        .map(|s| s.to_string())
209}
210
211/// Parse join "on" into list of column names. Accepts:
212/// - string -> [s]; array of strings -> those; array of {"col": "x"} -> ["x"];
213/// - array of {"op": "eq", "left": {"col": "a"}, "right": {"col": "a"}} -> ["a"] (Sparkless v4 format, #552).
214///   #704, #698: Reject expression-like strings (e.g. array_contains(...)) with clear error.
215fn parse_join_on(on: &Value, df: &DataFrame) -> Result<Vec<String>, PlanError> {
216    if let Some(s) = on.as_str() {
217        if s.contains('(') {
218            return Err(PlanError::InvalidPlan(
219                "join on expression (e.g. array_contains(...) or column expr) is not supported; use column names only".into(),
220            ));
221        }
222        let resolved = df.resolve_column_name(s).map_err(PlanError::Session)?;
223        return Ok(vec![resolved]);
224    }
225    let arr = on.as_array().ok_or_else(|| {
226        PlanError::InvalidPlan(
227            "join 'on' must be string, array of strings, or array of column refs / eq expressions"
228                .into(),
229        )
230    })?;
231    let mut keys = Vec::with_capacity(arr.len());
232    for v in arr {
233        if let Some(s) = v.as_str() {
234            let resolved = df.resolve_column_name(s).map_err(PlanError::Session)?;
235            keys.push(resolved);
236            continue;
237        }
238        if let Some(obj) = v.as_object() {
239            // {"col": "x"} -> single key for both sides
240            if let Some(name) = expr_to_col_name(v) {
241                let resolved = df.resolve_column_name(&name).map_err(PlanError::Session)?;
242                keys.push(resolved);
243                continue;
244            }
245            // {"op": "eq"|"==", "left": {"col": "a"}, "right": {"col": "a"}} (Sparkless v4)
246            let op = obj
247                .get("op")
248                .or_else(|| obj.get("operator"))
249                .and_then(Value::as_str);
250            if op.map(|o| o == "eq" || o == "==").unwrap_or(false) {
251                let left = obj.get("left").and_then(expr_to_col_name);
252                let right = obj.get("right").and_then(expr_to_col_name);
253                if let (Some(l), Some(r)) = (left, right) {
254                    if l == r {
255                        let resolved = df.resolve_column_name(&l).map_err(PlanError::Session)?;
256                        keys.push(resolved);
257                        continue;
258                    }
259                }
260            }
261        }
262        return Err(PlanError::InvalidPlan(
263            "join 'on' element must be string, {\"col\": \"name\"}, or {\"op\": \"eq\", \"left\": {\"col\": \"x\"}, \"right\": {\"col\": \"x\"}}".into(),
264        ));
265    }
266    Ok(keys)
267}
268
269/// Convert other_data to rows. Accepts arrays [[v,v],[v,v]] or dicts [{"col":v},...] (Sparkless may send dict rows).
270fn other_data_to_rows(other_data: &[Value], schema_names: &[String]) -> Vec<Vec<Value>> {
271    other_data
272        .iter()
273        .filter_map(|v| {
274            if let Some(arr) = v.as_array() {
275                return Some(arr.clone());
276            }
277            if let Some(obj) = v.as_object() {
278                let row: Vec<Value> = schema_names
279                    .iter()
280                    .map(|n| obj.get(n).cloned().unwrap_or(Value::Null))
281                    .collect();
282                return Some(row);
283            }
284            None
285        })
286        .collect()
287}
288
289/// Parse (name, type) from schema field object. Supports {"name","type"} and {"fieldName","dataType"} (Sparkless).
290fn schema_field_to_pair(v: &Value) -> Option<(String, String)> {
291    let obj = v.as_object()?;
292    let name = obj
293        .get("name")
294        .or_else(|| obj.get("fieldName"))
295        .and_then(Value::as_str)?
296        .to_string();
297    let ty = obj
298        .get("type")
299        .or_else(|| obj.get("dataType"))
300        .and_then(Value::as_str)
301        .or_else(|| {
302            // dataType may be nested: {"type":"string"} or {"typeName":"string"}
303            obj.get("dataType")?.get("typeName").and_then(Value::as_str)
304        })?
305        .to_string();
306    Some((name, ty))
307}
308
309fn apply_op(
310    session: &SparkSession,
311    df: DataFrame,
312    op_name: &str,
313    payload: Value,
314) -> Result<DataFrame, PlanError> {
315    match op_name {
316        "stop" => {
317            let _ = payload;
318            session.stop();
319            Ok(df)
320        }
321        "filter" => {
322            let expr = expr_from_value(&payload).map_err(PlanError::Expr)?;
323            df.filter(expr).map_err(PlanError::Session)
324        }
325        "select" => {
326            // Accept payload as array or as object with "columns" (Sparkless: {"columns": [...]}).
327            // #689, #688: Accept mixed Column/string: each item may be string, {col/column/name}, or {name, expr}.
328            let arr = payload
329                .as_array()
330                .or_else(|| payload.get("columns").and_then(Value::as_array));
331            if let Some(arr) = arr {
332                if arr.is_empty() {
333                    return Err(PlanError::InvalidPlan(
334                        "select payload must be non-empty array".into(),
335                    ));
336                }
337                let mut exprs = Vec::with_capacity(arr.len());
338                for (idx, v) in arr.iter().enumerate() {
339                    if let Some(obj) = v.as_object() {
340                        if let Some(expr_val) = obj.get("expr") {
341                            // Column-like expression: {name: "<alias>", expr: <expr>}
342                            let name = obj
343                                .get("name")
344                                .and_then(Value::as_str)
345                                .unwrap_or("_c"); // default alias if Sparkless omits
346                            let expr = expr_from_value(expr_val).map_err(PlanError::Expr)?;
347                            let resolved = df
348                                .resolve_expr_column_names(expr)
349                                .map_err(PlanError::Session)?;
350                            exprs.push(resolved.alias(name));
351                            continue;
352                        }
353                        // #970: accept bare expression object (e.g. {"fn": "alias", "args": [...]} or {"op": "add", ...})
354                        if obj.contains_key("fn") || obj.contains_key("op") {
355                            if let Ok(expr) = expr_from_value(v) {
356                                let resolved = df
357                                    .resolve_expr_column_names(expr)
358                                    .map_err(PlanError::Session)?;
359                                let alias = obj
360                                    .get("fn")
361                                    .and_then(Value::as_str)
362                                    .filter(|s| *s == "alias")
363                                    .and_then(|_| obj.get("args").and_then(Value::as_array))
364                                    .and_then(|a| a.last())
365                                    .and_then(Value::as_str)
366                                    .map(String::from)
367                                    .unwrap_or_else(|| format!("_c{idx}"));
368                                exprs.push(resolved.alias(alias));
369                                continue;
370                            }
371                        }
372                    }
373                    // Column name: string or {col/column/name}
374                    let name_str: String = if let Some(s) = v.as_str() {
375                        s.to_string()
376                    } else if let Some(obj) = v.as_object() {
377                        expr_to_col_name(v)
378                            .or_else(|| obj.get("name").and_then(Value::as_str).map(String::from))
379                            .ok_or_else(|| {
380                                PlanError::InvalidPlan(
381                                    "select item must be string, {col/column/name}, or {name, expr}".into(),
382                                )
383                            })?
384                    } else {
385                        return Err(PlanError::InvalidPlan(
386                            "select payload must be list of column name strings or {name, expr} or {col/column/name} objects".into(),
387                        ));
388                    };
389                    // #794: "concat(a, b) as full_name" -> parse concat expr if present
390                    let s = name_str.trim();
391                    let (expr_str, alias_override) = if let Some(ix) = s.rfind(" as ") {
392                        (s[..ix].trim(), Some(s[ix + 4..].trim())) // 4 = len(" as ")
393                    } else {
394                        (s, None)
395                    };
396                    if let Some(expr) =
397                        crate::plan::expr::try_parse_concat_expr_from_string(expr_str)
398                    {
399                        let resolved = df
400                            .resolve_expr_column_names(expr)
401                            .map_err(PlanError::Session)?;
402                        let alias = alias_override.unwrap_or(s);
403                        exprs.push(resolved.alias(alias));
404                    } else {
405                        // #1055: col("StructValue.E1") / dot notation – resolve as expression so struct field access works.
406                        let col_expr = polars::prelude::col::<&str>(expr_str);
407                        let resolved = df
408                            .resolve_expr_column_names(col_expr)
409                            .map_err(PlanError::Session)?;
410                        // #1014, #1022: Preserve requested column name as output (e.g. "NaMe", "StructValue.E1") so row keys match.
411                        exprs.push(resolved.alias(name_str.as_str()));
412                    }
413                }
414                df.select_exprs(exprs).map_err(PlanError::Session)
415            } else {
416                Err(PlanError::InvalidPlan(
417                    "select payload must be array of column names or {name, expr} objects, or object with 'columns' array".into(),
418                ))
419            }
420        }
421        "limit" => {
422            let n = payload.get("n").and_then(Value::as_u64).ok_or_else(|| {
423                PlanError::InvalidPlan("limit payload must have 'n' number".into())
424            })?;
425            df.limit(n as usize).map_err(PlanError::Session)
426        }
427        "offset" => {
428            let n = payload.get("n").and_then(Value::as_u64).unwrap_or(0);
429            df.offset(n as usize).map_err(PlanError::Session)
430        }
431        "orderBy" => {
432            let columns = payload
433                .get("columns")
434                .and_then(Value::as_array)
435                .ok_or_else(|| {
436                    PlanError::InvalidPlan("orderBy payload must have 'columns' array".into())
437                })?;
438            // Each element: string ("col", "col DESC", "col ASC", "['a','b']"), or object {"col":"name"} / {"name":"x"} (PR-D).
439            let mut pairs: Vec<(String, bool)> = Vec::new();
440            for v in columns.iter() {
441                if let Some(parsed) = parse_order_by_element(v) {
442                    pairs.extend(parsed);
443                }
444            }
445            if pairs.is_empty() {
446                return Err(PlanError::InvalidPlan(
447                    "orderBy columns could not be parsed (expect column names, 'col ASC'/'col DESC', or ['a','b'])".into(),
448                ));
449            }
450            let col_names: Vec<String> = pairs
451                .iter()
452                .map(|(s, _)| df.resolve_column_name(s.as_str()))
453                .collect::<Result<Vec<_>, _>>()
454                .map_err(PlanError::Session)?;
455            let ascending: Vec<bool> = pairs.iter().map(|(_, asc)| *asc).collect();
456            let nulls_last = payload
457                .get("nulls_last")
458                .and_then(Value::as_array)
459                .map(|a| a.iter().filter_map(|v| v.as_bool()).collect::<Vec<_>>());
460            if let Some(nl) = nulls_last {
461                let mut sort_orders: Vec<SortOrder> = Vec::with_capacity(col_names.len());
462                for (i, name) in col_names.iter().enumerate() {
463                    let asc = ascending.get(i).copied().unwrap_or(true);
464                    let nlast = nl.get(i).copied().unwrap_or(asc);
465                    let column = col(name.as_str());
466                    let so = if asc {
467                        if nlast {
468                            asc_nulls_last(&column)
469                        } else {
470                            asc_nulls_first(&column)
471                        }
472                    } else if nlast {
473                        desc_nulls_last(&column)
474                    } else {
475                        desc_nulls_first(&column)
476                    };
477                    sort_orders.push(so);
478                }
479                df.order_by_exprs(sort_orders).map_err(PlanError::Session)
480            } else {
481                let refs: Vec<&str> = col_names.iter().map(|s| s.as_str()).collect();
482                df.order_by(refs, ascending).map_err(PlanError::Session)
483            }
484        }
485        "distinct" => df.distinct(None).map_err(PlanError::Session),
486        "drop" => {
487            let columns = payload
488                .get("columns")
489                .and_then(Value::as_array)
490                .ok_or_else(|| {
491                    PlanError::InvalidPlan("drop payload must have 'columns' array".into())
492                })?;
493            let names: Vec<String> = columns
494                .iter()
495                .filter_map(|v| {
496                    v.as_str().map(String::from).or_else(|| expr_to_col_name(v))
497                })
498                .map(|s| df.resolve_column_name(s.as_str()))
499                .collect::<Result<Vec<_>, _>>()
500                .map_err(PlanError::Session)?;
501            let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
502            df.drop(refs).map_err(PlanError::Session)
503        }
504        "withColumnRenamed" => {
505            let old_name = payload.get("old").and_then(Value::as_str).ok_or_else(|| {
506                PlanError::InvalidPlan("withColumnRenamed must have 'old'".into())
507            })?;
508            let new_name = payload.get("new").and_then(Value::as_str).ok_or_else(|| {
509                PlanError::InvalidPlan("withColumnRenamed must have 'new'".into())
510            })?;
511            let resolved_old = df
512                .resolve_column_name(old_name)
513                .map_err(PlanError::Session)?;
514            df.with_column_renamed(&resolved_old, new_name)
515                .map_err(PlanError::Session)
516        }
517        "withColumn" => {
518            // #767, #766: column name from "name" or "alias" so cast column appears in schema.
519            let name = payload
520                .get("name")
521                .or_else(|| payload.get("alias"))
522                .and_then(Value::as_str)
523                .ok_or_else(|| PlanError::InvalidPlan("withColumn must have 'name' or 'alias'".into()))?;
524            let expr_val = payload
525                .get("expr")
526                .ok_or_else(|| PlanError::InvalidPlan("withColumn must have 'expr'".into()))?;
527            if let Some(res) = try_column_from_udf_value(expr_val) {
528                let col = res.map_err(PlanError::Expr)?;
529                df.with_column(name, &col).map_err(PlanError::Session)
530            } else {
531                let expr = expr_from_value(expr_val).map_err(PlanError::Expr)?;
532                df.with_column_expr(name, expr).map_err(PlanError::Session)
533            }
534        }
535        "groupBy" => {
536            let group_by = payload
537                .get("group_by")
538                .and_then(Value::as_array)
539                .ok_or_else(|| {
540                    PlanError::InvalidPlan("groupBy must have 'group_by' array".into())
541                })?;
542            // Each element: string, {"col"/"name": "x"}, or {"expr": <expr>} (#800: Column/expr as group key).
543            let cols: Vec<String> = group_by
544                .iter()
545                .filter_map(|v| {
546                    v.as_str()
547                        .map(|s| s.to_string())
548                        .or_else(|| {
549                            v.get("col")
550                                .and_then(Value::as_str)
551                                .map(|s| s.to_string())
552                                .or_else(|| {
553                                    v.get("name").and_then(Value::as_str).map(|s| s.to_string())
554                                })
555                        })
556                        .or_else(|| {
557                            // Expression form: resolve to output column name for group key.
558                            v.get("expr")
559                                .and_then(|e| expr_from_value(e).ok())
560                                .and_then(|expr| {
561                                    polars_plan::utils::expr_output_name(&expr)
562                                        .ok()
563                                        .map(|s| s.as_str().to_string())
564                                })
565                        })
566                })
567                .map(|s| df.resolve_column_name(s.as_str()))
568                .collect::<Result<Vec<_>, _>>()
569                .map_err(PlanError::Session)?;
570            let refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
571            let grouped = df.group_by(refs).map_err(PlanError::Session)?;
572            // Sparkless may send "aggregations" instead of "aggs" (fixes #828–#838).
573            let aggs = payload
574                .get("aggs")
575                .or_else(|| payload.get("aggregations"))
576                .and_then(Value::as_array);
577            match aggs {
578                Some(aggs_arr) => {
579                    let agg_exprs = parse_aggs(aggs_arr, &df)?;
580                    let disambiguated = disambiguate_agg_output_names(agg_exprs);
581                    grouped.agg(disambiguated).map_err(PlanError::Session)
582                }
583                None => Err(PlanError::InvalidPlan(
584                    "groupBy payload must include 'aggs' array (e.g. [{\"agg\": \"sum\", \"column\": \"b\"}])".into(),
585                )),
586            }
587        }
588        "join" => handle_join_op(session, df, payload),
589        "union" => handle_union_op(session, df, payload),
590        "unionByName" => handle_union_by_name_op(session, df, payload),
591        "crossJoin" | "cross_join" => handle_cross_join_op(session, df, payload),
592        "rollup" => Err(PlanError::UnsupportedOp(
593            "Plan op 'rollup' is not yet supported. Use groupBy for now. See docs for supported operations.".into(),
594        )),
595        "cube" => Err(PlanError::UnsupportedOp(
596            "Plan op 'cube' is not yet supported. Use groupBy for now. See docs for supported operations.".into(),
597        )),
598        _ => Err(PlanError::UnsupportedOp(format!(
599            "Plan op '{op_name}' is not supported. See docs for supported operations (e.g. select, filter, groupBy, join, orderBy, limit)."
600        ))),
601    }
602}
603
604fn handle_join_op(
605    session: &SparkSession,
606    df: DataFrame,
607    payload: Value,
608) -> Result<DataFrame, PlanError> {
609    let other_data = get_other_data(&payload)
610        .ok_or_else(|| PlanError::InvalidPlan("join must have 'other_data'".into()))?;
611    let other_schema = get_other_schema(&payload)
612        .ok_or_else(|| PlanError::InvalidPlan("join must have 'other_schema'".into()))?;
613    let on = payload
614        .get("on")
615        .ok_or_else(|| PlanError::InvalidPlan("join must have 'on' array or string".into()))?;
616    let how = payload
617        .get("how")
618        .and_then(Value::as_str)
619        .unwrap_or("inner");
620
621    let schema_vec: Vec<(String, String)> = other_schema
622        .iter()
623        .filter_map(schema_field_to_pair)
624        .collect();
625    let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
626    let rows = other_data_to_rows(other_data, &schema_names);
627    let mut other_df = session
628        .create_dataframe_from_rows(rows, schema_vec, false, false)
629        .map_err(PlanError::Session)?;
630
631    let on_keys_left = parse_join_on(on, &df)?;
632    // Align right join key column names to left's (e.g. left "Dept_Id" vs right "dept_id" -> rename right to "Dept_Id") (#552).
633    let on_keys_right = parse_join_on(on, &other_df)?;
634    for (i, left_name) in on_keys_left.iter().enumerate() {
635        if let Some(right_name) = on_keys_right.get(i) {
636            if left_name != right_name {
637                other_df = other_df
638                    .with_column_renamed(right_name, left_name)
639                    .map_err(PlanError::Session)?;
640            }
641        }
642    }
643    // After renaming, left and right join key names align; treat this like an equality-based
644    // join on the same key columns so that outer join key semantics match PySpark and the
645    // parity fixtures (e.g. case_outer_join in gen_pyspark_cases.py).
646    let left_refs: Vec<&str> = on_keys_left.iter().map(|s| s.as_str()).collect();
647    let right_refs: Vec<&str> = on_keys_left.iter().map(|s| s.as_str()).collect();
648    let join_type = match how {
649        "left" => JoinType::Left,
650        "right" => JoinType::Right,
651        "outer" => JoinType::Outer,
652        "left_semi" | "leftsemi" | "semi" => JoinType::LeftSemi,
653        "left_anti" | "leftanti" | "anti" => JoinType::LeftAnti,
654        _ => JoinType::Inner,
655    };
656    df.join_with_keys(&other_df, left_refs, right_refs, join_type, false)
657        .map_err(PlanError::Session)
658}
659
660fn handle_union_op(
661    session: &SparkSession,
662    df: DataFrame,
663    payload: Value,
664) -> Result<DataFrame, PlanError> {
665    let other_data = get_other_data(&payload)
666        .ok_or_else(|| PlanError::InvalidPlan("union must have 'other_data'".into()))?;
667    let other_schema = get_other_schema(&payload)
668        .ok_or_else(|| PlanError::InvalidPlan("union must have 'other_schema'".into()))?;
669    let schema_vec: Vec<(String, String)> = other_schema
670        .iter()
671        .filter_map(schema_field_to_pair)
672        .collect();
673    let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
674    let rows = other_data_to_rows(other_data, &schema_names);
675    let other_df = session
676        .create_dataframe_from_rows(rows, schema_vec, false, false)
677        .map_err(PlanError::Session)?;
678    df.union(&other_df).map_err(PlanError::Session)
679}
680
681fn handle_union_by_name_op(
682    session: &SparkSession,
683    df: DataFrame,
684    payload: Value,
685) -> Result<DataFrame, PlanError> {
686    let other_data = get_other_data(&payload)
687        .ok_or_else(|| PlanError::InvalidPlan("unionByName must have 'other_data'".into()))?;
688    let other_schema = get_other_schema(&payload)
689        .ok_or_else(|| PlanError::InvalidPlan("unionByName must have 'other_schema'".into()))?;
690    let schema_vec: Vec<(String, String)> = other_schema
691        .iter()
692        .filter_map(schema_field_to_pair)
693        .collect();
694    let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
695    let rows = other_data_to_rows(other_data, &schema_names);
696    let other_df = session
697        .create_dataframe_from_rows(rows, schema_vec, false, false)
698        .map_err(PlanError::Session)?;
699    df.union_by_name(&other_df, true)
700        .map_err(PlanError::Session)
701}
702
703fn handle_cross_join_op(
704    session: &SparkSession,
705    df: DataFrame,
706    payload: Value,
707) -> Result<DataFrame, PlanError> {
708    let other_data = get_other_data(&payload)
709        .ok_or_else(|| PlanError::InvalidPlan("crossJoin must have 'other_data'".into()))?;
710    let other_schema = get_other_schema(&payload)
711        .ok_or_else(|| PlanError::InvalidPlan("crossJoin must have 'other_schema'".into()))?;
712    let schema_vec: Vec<(String, String)> = other_schema
713        .iter()
714        .filter_map(schema_field_to_pair)
715        .collect();
716    let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
717    let rows = other_data_to_rows(other_data, &schema_names);
718    let other_df = session
719        .create_dataframe_from_rows(rows, schema_vec, false, false)
720        .map_err(PlanError::Session)?;
721    df.cross_join(&other_df).map_err(PlanError::Session)
722}
723
724fn parse_aggs(aggs: &[Value], df: &DataFrame) -> Result<Vec<polars::prelude::Expr>, PlanError> {
725    use crate::Column;
726    use crate::functions::{avg, count, first as rs_first, max, min, sum as rs_sum};
727    use polars::prelude::len;
728    use std::collections::HashMap;
729
730    let mut out = Vec::with_capacity(aggs.len());
731    let mut alias_count: HashMap<String, u32> = HashMap::new();
732    for a in aggs {
733        let obj = a
734            .as_object()
735            .ok_or_else(|| PlanError::InvalidPlan("each agg must be an object".into()))?;
736        // Full expression tree (e.g. cast+alias from Sparkless); issue #1255 / #332.
737        if let Some(expr_val) = obj.get("expr") {
738            let expr = expr_from_value(expr_val).map_err(PlanError::Expr)?;
739            let resolved = df
740                .resolve_expr_column_names(expr)
741                .map_err(PlanError::Session)?;
742            out.push(resolved);
743            continue;
744        }
745        // Sparkless may send "func" instead of "agg" (e.g. groupby_first_last; fixes #828–#838).
746        let agg = obj
747            .get("agg")
748            .or_else(|| obj.get("func"))
749            .and_then(Value::as_str)
750            .ok_or_else(|| PlanError::InvalidPlan("agg must have 'agg' or 'func' string".into()))?;
751
752        if agg == "python_grouped_udf" {
753            // Grouped Python UDF aggregations are not expressible as pure Expr; the plan
754            // interpreter currently supports only Rust/built-in aggregations at this level.
755            return Err(PlanError::InvalidPlan(
756                "python_grouped_udf aggregations are not yet supported in execute_plan; use built-in aggregations in plans for now".into(),
757            ));
758        }
759
760        let col_name = obj.get("column").and_then(Value::as_str);
761        let c = match col_name {
762            Some(name) => {
763                let resolved = df.resolve_column_name(name).map_err(PlanError::Session)?;
764                Column::new(resolved)
765            }
766            None => {
767                if agg == "count" {
768                    Column::new("".to_string()) // count() without column
769                } else {
770                    return Err(PlanError::InvalidPlan(format!(
771                        "agg '{agg}' requires 'column'"
772                    )));
773                }
774            }
775        };
776        // count() without column = row count per group (PySpark count(*)); use len() (#825, #824). Cast to Int64 for LongType (#734).
777        let col_expr = match agg {
778            "count" if col_name.map(|s| s.is_empty()).unwrap_or(true) => Column::from_expr(
779                len().cast(polars::prelude::DataType::Int64),
780                Some("count".to_string()),
781            ),
782            "count" => count(&c),
783            "sum" => {
784                // Integer columns: keep Int64 (PySpark sum of int -> long). Else cast to Float64 (string/numeric parity #393).
785                let keep_int = col_name
786                    .and_then(|n| df.get_column_data_type(n))
787                    .map(|dt| {
788                        matches!(
789                            dt,
790                            crate::schema::DataType::Long | crate::schema::DataType::Integer
791                        )
792                    })
793                    .unwrap_or(false);
794                if keep_int {
795                    let name = c.name().to_string();
796                    Column::from_expr(c.into_expr().sum(), Some(format!("sum({})", name)))
797                } else {
798                    rs_sum(&c)
799                }
800            }
801            "avg" => avg(&c),
802            "min" => min(&c),
803            "max" => max(&c),
804            "first" => {
805                let ignorenulls = obj
806                    .get("ignorenulls")
807                    .and_then(Value::as_bool)
808                    .unwrap_or(false);
809                rs_first(&c, ignorenulls)
810            }
811            "last" => Column::from_expr(c.into_expr().last(), None),
812            _ => return Err(PlanError::InvalidPlan(format!("unsupported agg: {agg}"))),
813        };
814        let mut expr = col_expr.into_expr();
815        // Optional cast (e.g. {"agg": "sum", "column": "Score", "alias": "TotalScore", "cast": "int"}); #1255, #332.
816        if let Some(cast_type) = obj
817            .get("cast")
818            .or_else(|| obj.get("cast_type"))
819            .and_then(Value::as_str)
820        {
821            let dtype =
822                crate::functions::parse_type_name(cast_type).map_err(PlanError::InvalidPlan)?;
823            expr = expr.strict_cast(dtype);
824        }
825        // #672, #791: PySpark-style result column names (e.g. avg(Value)) when plan does not set alias.
826        // #777: Deduplicate aliases so multiple count() etc. get count, count_1, count_2, ...
827        // Accept "name" as fallback for "alias" (Sparkless may send either).
828        let base_alias = obj
829            .get("alias")
830            .or_else(|| obj.get("name"))
831            .and_then(Value::as_str)
832            .map(String::from)
833            .unwrap_or_else(|| match (agg, col_name) {
834                ("count", None) => "count".to_string(),
835                (a, Some(col)) => format!("{}({})", a, col),
836                (a, None) => format!("{}({})", a, ""),
837            });
838        let count = alias_count.entry(base_alias.clone()).or_insert(0);
839        *count += 1;
840        let alias = if *count == 1 {
841            base_alias
842        } else {
843            format!("{}_{}", base_alias, *count - 1)
844        };
845        expr = expr.alias(&alias);
846        out.push(expr);
847    }
848    Ok(out)
849}
850
851#[cfg(test)]
852mod tests {
853    use super::*;
854    use serde_json::json;
855
856    /// #672: groupBy + agg without alias produces PySpark-style column name (e.g. avg(Value)).
857    #[test]
858    fn test_groupby_agg_column_name_avg_value() {
859        let session = crate::session::SparkSession::builder()
860            .app_name("plan_agg_alias")
861            .get_or_create();
862        let data = vec![
863            vec![json!("Alice"), json!(5.0)],
864            vec![json!("Alice"), json!(6.0)],
865            vec![json!("Bob"), json!(5.0)],
866        ];
867        let schema = vec![
868            ("Name".to_string(), "string".to_string()),
869            ("Value".to_string(), "double".to_string()),
870        ];
871        let plan = vec![json!({
872            "op": "groupBy",
873            "payload": {
874                "group_by": ["Name"],
875                "aggs": [{"agg": "avg", "column": "Value"}]
876            }
877        })];
878        let df = execute_plan(&session, data, schema, &plan).unwrap();
879        let out = df.collect_inner().unwrap();
880        let names = out.get_column_names();
881        assert!(
882            names.iter().any(|s| s.as_str() == "avg(Value)"),
883            "expected column 'avg(Value)' in {:?}",
884            names
885        );
886    }
887
888    #[test]
889    fn test_cross_join_plan() {
890        let session = crate::session::SparkSession::builder()
891            .app_name("plan_cross_join")
892            .get_or_create();
893        let data = vec![vec![json!(1)], vec![json!(2)]];
894        let schema = vec![("a".to_string(), "bigint".to_string())];
895        let plan = vec![json!({
896            "op": "crossJoin",
897            "payload": {
898                "other_data": [[3], [4]],
899                "other_schema": [{"name": "b", "type": "bigint"}]
900            }
901        })];
902        let df = execute_plan(&session, data, schema, &plan).unwrap();
903        let out = df.collect_inner().unwrap();
904        assert_eq!(out.height(), 4, "cross join 2x2 = 4 rows");
905        assert_eq!(out.get_column_names(), &["a", "b"]);
906    }
907
908    /// #704, #698: Join with expression in "on" (e.g. array_contains) returns clear error.
909    #[test]
910    fn test_join_on_expression_returns_clear_error() {
911        let session = crate::session::SparkSession::builder()
912            .app_name("plan_join_on_expr")
913            .get_or_create();
914        let data = vec![vec![json!(1), json!("a")]];
915        let schema = vec![
916            ("id".to_string(), "bigint".to_string()),
917            ("x".to_string(), "string".to_string()),
918        ];
919        let plan = vec![json!({
920            "op": "join",
921            "payload": {
922                "on": "array_contains(col, x)",
923                "how": "inner",
924                "other_data": [[1, "b"]],
925                "other_schema": [{"name": "id", "type": "bigint"}, {"name": "x", "type": "string"}]
926            }
927        })];
928        let result = execute_plan(&session, data, schema, &plan);
929        let err = match result {
930            Ok(_) => panic!("join on expression should fail"),
931            Err(e) => e,
932        };
933        let msg = err.to_string();
934        assert!(
935            msg.contains("join on expression") || msg.contains("use column names only"),
936            "error should explain join-on expression not supported: {}",
937            msg
938        );
939    }
940
941    #[test]
942    fn test_order_by_col_desc_and_list_format() {
943        let session = crate::session::SparkSession::builder()
944            .app_name("plan_orderby_desc")
945            .get_or_create();
946        let data = vec![
947            vec![json!(1), json!("z")],
948            vec![json!(2), json!("a")],
949            vec![json!(3), json!("m")],
950        ];
951        let schema = vec![
952            ("id".to_string(), "bigint".to_string()),
953            ("name".to_string(), "string".to_string()),
954        ];
955        let plan = vec![json!({
956            "op": "orderBy",
957            "payload": { "columns": ["name DESC"] }
958        })];
959        let df = execute_plan(&session, data.clone(), schema.clone(), &plan).unwrap();
960        assert_eq!(df.count().unwrap(), 3);
961        let rows = df.collect_as_json_rows().unwrap();
962        assert_eq!(rows[0].get("name").and_then(|v| v.as_str()), Some("z"));
963        let plan2 = vec![json!({
964            "op": "orderBy",
965            "payload": { "columns": ["['id','name']"] }
966        })];
967        let df2 = execute_plan(&session, data, schema, &plan2).unwrap();
968        assert_eq!(df2.collect_inner().unwrap().height(), 3);
969    }
970
971    /// PR-1/#860,#874,#876: sin, cos, tan are supported in plan (expr_from_value); regression test.
972    #[test]
973    fn test_plan_select_sin_cos_tan() {
974        let session = crate::session::SparkSession::builder()
975            .app_name("plan_trig")
976            .get_or_create();
977        let pi_2 = std::f64::consts::FRAC_PI_2;
978        let data = vec![vec![json!(0.0)], vec![json!(pi_2)]];
979        let schema = vec![("x".to_string(), "double".to_string())];
980        let plan = vec![json!({
981            "op": "select",
982            "payload": [
983                {"name": "x", "expr": {"col": "x"}},
984                {"name": "s", "expr": {"fn": "sin", "args": [{"col": "x"}]}},
985                {"name": "c", "expr": {"fn": "cos", "args": [{"col": "x"}]}},
986                {"name": "t", "expr": {"fn": "tan", "args": [{"col": "x"}]}}
987            ]
988        })];
989        let df = execute_plan(&session, data, schema, &plan).unwrap();
990        assert_eq!(df.count().unwrap(), 2);
991        let out = df.collect_inner().unwrap();
992        assert_eq!(out.height(), 2);
993        // sin(0)=0, cos(0)=1, tan(0)=0; sin(pi/2)=1, cos(pi/2)~0
994        let s_col = out.column("s").unwrap();
995        let c_col = out.column("c").unwrap();
996        assert_eq!(s_col.f64().unwrap().get(0), Some(0.0));
997        assert!((c_col.f64().unwrap().get(0).unwrap() - 1.0).abs() < 1e-10);
998        assert!((s_col.f64().unwrap().get(1).unwrap() - 1.0).abs() < 1e-10);
999    }
1000
1001    /// PR-5/#873,#884: select and drop accept column refs as {"col": "name"} (Sparkless PyColumn).
1002    #[test]
1003    fn test_plan_select_drop_column_ref_objects() {
1004        let session = crate::session::SparkSession::builder()
1005            .app_name("plan_select_col_ref")
1006            .get_or_create();
1007        let data = vec![
1008            vec![json!(1), json!("x"), json!(10)],
1009            vec![json!(2), json!("y"), json!(20)],
1010        ];
1011        let schema = vec![
1012            ("a".to_string(), "bigint".to_string()),
1013            ("b".to_string(), "string".to_string()),
1014            ("c".to_string(), "bigint".to_string()),
1015        ];
1016        // Select using [{"col": "a"}, {"col": "b"}] instead of ["a", "b"]
1017        let plan_select = vec![json!({
1018            "op": "select",
1019            "payload": [{"col": "a"}, {"col": "b"}]
1020        })];
1021        let df = execute_plan(&session, data.clone(), schema.clone(), &plan_select).unwrap();
1022        let out = df.collect_inner().unwrap();
1023        assert_eq!(out.get_column_names(), &["a", "b"]);
1024        assert_eq!(out.height(), 2);
1025        // Drop using [{"col": "b"}] to leave a, c
1026        let plan_drop = vec![
1027            json!({"op": "select", "payload": [{"col": "a"}, {"col": "b"}, {"col": "c"}]}),
1028            json!({"op": "drop", "payload": {"columns": [{"col": "b"}]}}),
1029        ];
1030        let df2 = execute_plan(&session, data, schema, &plan_drop).unwrap();
1031        let out2 = df2.collect_inner().unwrap();
1032        assert_eq!(out2.get_column_names(), &["a", "c"]);
1033    }
1034
1035    /// Batch 6 / #714: create_dataframe_from_rows with null in rows then plan filter/select.
1036    #[test]
1037    fn test_plan_create_dataframe_from_rows_with_nulls() {
1038        let session = crate::session::SparkSession::builder()
1039            .app_name("plan_nulls")
1040            .get_or_create();
1041        let data = vec![
1042            vec![json!(1), json!("a"), serde_json::Value::Null],
1043            vec![json!(2), serde_json::Value::Null, json!(20)],
1044        ];
1045        let schema = vec![
1046            ("id".to_string(), "bigint".to_string()),
1047            ("name".to_string(), "string".to_string()),
1048            ("value".to_string(), "bigint".to_string()),
1049        ];
1050        let plan = vec![
1051            json!({"op": "filter", "payload": {"op": "gt", "left": {"col": "id"}, "right": {"lit": 0}}}),
1052            json!({"op": "select", "payload": ["id", "name", "value"]}),
1053        ];
1054        let df = execute_plan(&session, data, schema, &plan).unwrap();
1055        assert_eq!(df.count().unwrap(), 2);
1056        let rows = df.collect_as_json_rows().unwrap();
1057        assert_eq!(rows[0].get("value"), Some(&serde_json::Value::Null));
1058        assert_eq!(rows[1].get("name"), Some(&serde_json::Value::Null));
1059    }
1060
1061    /// Ensure the core PlanExecutor trait implementation stays in sync with execute_plan.
1062    #[test]
1063    fn test_plan_executor_trait_matches_execute_plan() {
1064        use robin_sparkless_core::engine::PlanExecutor as _;
1065
1066        let session = crate::session::SparkSession::builder()
1067            .app_name("plan_executor_trait")
1068            .get_or_create();
1069        let data = vec![vec![json!(1)], vec![json!(2)]];
1070        let schema = vec![("x".to_string(), "bigint".to_string())];
1071        let plan = vec![json!({
1072            "op": "filter",
1073            "payload": {"op": "gt", "left": {"col": "x"}, "right": {"lit": 1}}
1074        })];
1075
1076        // Direct call.
1077        let df_direct = execute_plan(&session, data.clone(), schema.clone(), &plan).unwrap();
1078
1079        // Trait-based call returns boxed backend; downcast to DataFrame and compare.
1080        let boxed = PolarsPlanExecutor::execute_plan(&session, data, schema, &plan).unwrap();
1081        let backend_df = boxed
1082            .as_any()
1083            .downcast_ref::<crate::dataframe::DataFrame>()
1084            .expect("expected Polars DataFrame backend")
1085            .clone();
1086
1087        assert_eq!(
1088            df_direct.collect_inner().unwrap(),
1089            backend_df.collect_inner().unwrap()
1090        );
1091    }
1092}